Google Cloud Dataflow での Notebooks の使用

1. はじめに

Cloud-Dataflow.png

Google Cloud Dataflow

最終更新日: 2023 年 7 月 5 日

Dataflow とは

Dataflow は、さまざまなデータ処理パターンの実行に対応したマネージド サービスです。このサイトのドキュメントでは、Dataflow を使用してバッチおよびストリーミングのデータ処理パイプラインをデプロイする方法とサービス機能の使用方法を説明します。

Apache Beam SDK は、バッチとストリーミングの両方のパイプラインの開発に対応したオープンソースのプログラミング モデルです。Apache Beam プログラムでパイプラインを作成し、Dataflow サービスで実行します。Apache Beam のドキュメントには、詳細なコンセプト情報と Apache Beam のプログラミング モデル、SDK、他のランナーのリファレンス情報が記載されています。

高速なストリーミング データ分析

Dataflow を使用すると、データ転送のレイテンシを抑えた、高速で簡素化されたストリーミング データ パイプライン開発が可能になります。

運用と管理を簡素化する

Dataflow のサーバーレス アプローチにより、データ エンジニアリングのワークロードから運用上のオーバーヘッドが取り除かれるため、チームはサーバー クラスタの管理ではなく、プログラミングに専念できます。

総所有コストを削減する

リソースの自動スケーリングとコスト最適化されたバッチ処理機能を組み合わせることにより、Dataflow で実質無制限の容量を利用できます。過剰な費用をかけずに、時季変動したり急変動したりするワークロードを管理できます。

主な機能

自動化されたリソース管理と動的作業再調整

Dataflow は、処理リソースのプロビジョニングと管理を自動化することで、レイテンシを最小限に抑え、使用率を高いレベルで維持します。そのため、手動で追加のインスタンスを起動したり、予約する必要がなくなります。作業配分も自動化、最適化されて、遅れている作業は動的に再調整されます。もうホットキーを追跡したり、入力データを事前に処理する必要はありません。

水平自動スケーリング

ワーカー リソースの水平自動スケーリング機能によりスループットが最適化され、処理性能に対する費用対効果が全体的に向上します。

バッチ処理に適したフレキシブル リソース スケジューリング料金

深夜のジョブなど、スケジュールに柔軟性があるジョブの処理には、フレキシブル リソース スケジューリング(FlexRS)をお使いください。低料金のバッチ処理が可能です。このようなフレキシブル ジョブはキューに入り、6 時間以内に確実にキューから取り出されて実行されます。

この一部として実行する内容

JupyterLab ノートブックで Apache Beam インタラクティブ ランナーで、入力、評価、出力ループ(REPL)ワークフローでパイプラインの反復した開発、パイプライン グラフの検査、個々の PCollection の解析ができます。これらの Apache Beam ノートブックは、Vertex AI Workbench で提供されています。これは、最新のデータ サイエンスと ML フレームワークがプリインストールされたノートブック仮想マシンをホストするマネージド サービスです。

この Codelab では、Apache Beam ノートブックを導入することで実現する機能を中心に取り上げます。

学習内容

  • ノートブック インスタンスを作成する方法
  • 基本的なパイプラインの作成
  • 無制限のソースからデータを読み取る
  • データの可視化
  • ノートブックから Dataflow ジョブを起動する
  • ノートブックの保存

必要なもの

  • 課金が有効になっている Google Cloud Platform プロジェクト。
  • Google Cloud Dataflow と Google Cloud Pub/Sub が有効になっている。

2. 設定方法

  1. Cloud Console のプロジェクト セレクタページで、Cloud プロジェクトを選択または作成します。

次の API が有効になっていることを確認します。

  • Dataflow API
  • Cloud Pub/Sub API
  • Compute Engine
  • Notebooks API

これは、[API とサービス] ページで確認できます。

このガイドでは、Pub/Sub サブスクリプションからデータを読み取るため、Compute Engine のデフォルトのサービス アカウントに編集者ロールが付与されているか、Pub/Sub 編集者ロールを付与されていることを確認してください。

3. Apache Beam ノートブックを使ってみる

Apache Beam ノートブック インスタンスの起動

  1. コンソールで Dataflow を起動します。

  1. 左側のメニューを使用して、[ワークベンチ] ページを選択します。
  2. [ユーザー管理のノートブック] タブが表示されていることを確認します。
  3. ツールバーで [新しいノートブック] をクリックします。
  4. [Apache Beam] > [Withoug GPUs] を選択します。
  5. [新しいノートブック] ページで、ノートブック VM のサブネットワークを選択し、[作成] をクリックします。
  6. リンクがアクティブになったら、[JupyterLab を開く] をクリックします。Vertex AI Workbench が、新しい Apache Beam ノートブック インスタンスを作成します。

4. パイプラインの作成

ノートブック インスタンスの作成

[File] > [New] > [Notebook] に移動し、Apache Beam 2.47 以降のカーネルを選択します。

ノートブックにコードの追加を開始する

  • ノートブックの新しいセルに、各セクションのコードをコピーして貼り付けます。
  • セルを実行する

6bd3dd86cc7cf802.png

JupyterLab ノートブックで Apache Beam インタラクティブ ランナーを使用すると、REPL(Read-Eval-Print Loop)ワークフローで、パイプラインを反復的に開発し、パイプライン グラフを検査し、個々の PCollection を解析できます。

Apache Beam はノートブック インスタンスにインストールされているため、ノートブックには interactive_runner モジュールと interactive_beam モジュールが含まれます。

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

ノートブックで他の Google サービスを使用している場合は、次の import ステートメントを追加します。

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

インタラクティブ オプションの設定

次の例では、データの取得時間を 60 秒に設定します。イテレーションを高速化する場合は、期間を短く設定します(例: 10 秒)。

ib.options.recording_duration = '60s'

その他のインタラクティブ オプションについては、interactive_beam.options クラスをご覧ください。

InteractiveRunner オブジェクトを使用して、パイプラインを初期化します。

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

データの読み取りと可視化

次の例は、指定された Pub/Sub トピックへのサブスクリプションを作成し、サブスクリプションからの読み取りを行う Apache Beam パイプラインを示しています。

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

パイプラインは、ソースからウィンドウごとに単語をカウントします。固定されたウィンドウを作成し、各ウィンドウの長さを 10 秒に設定します。

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

データがウィンドウ処理されると、ウィンドウごとに単語をカウントします。

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

データの可視化

show() メソッドは、作成された PCollection をノートブックに可視化します。

ib.show(windowed_word_counts, include_window_info=True)

PCollection をテーブル形式で可視化する show メソッド。

データの可視化を表示するには、visualize_data=Trueshow() メソッドに渡します。新しいセルを追加します。

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

可視化には複数のフィルタを適用できます。次の可視化では、ラベルと軸でフィルタリングできます。

フィルタできる多様な UI 要素のセットとして PCollection を可視化する show メソッド。

5. Pandas DataFrame を使用する

Apache Beam ノートブックの可視化では、Pandas DataFrame も役立ちます。次の例では、最初に単語を小文字に変換してから、各単語の頻度を計算します。

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

collect() メソッドにより、Pandas DataFrame の出力を取得できます。

ib.collect(windowed_lower_word_counts, include_window_info=True)

Pandas DataFrame 内の PCollection を表す collect メソッド。

6. (省略可)ノートブックから Dataflow ジョブを起動する

  1. Dataflow でジョブを実行するには、追加の権限が必要です。Compute Engine のデフォルト サービス アカウントに編集者ロールが付与されていることを確認するか、次の IAM ロールを付与します。
  • Dataflow 管理者
  • Dataflow ワーカー
  • ストレージ管理者
  • サービス アカウント ユーザー(roles/iam.serviceAccountUser)

ロールの詳細については、ドキュメントをご覧ください。

  1. (省略可)ノートブックを使用して Dataflow ジョブを実行する前に、カーネルを再起動し、すべてのセルを再実行して出力を確認します。
  2. 次の import ステートメントを削除します。
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. 次の import ステートメントを追加します。
from apache_beam.runners import DataflowRunner
  1. 次の録画時間のオプションを削除します。
ib.options.recording_duration = '60s'
  1. パイプライン オプションに以下を追加します。Cloud Storage のロケーションを調整して、すでに所有しているバケットを指すようにするか、この目的で新しいバケットを作成する必要があります。us-central1 からリージョン値を変更することもできます。
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. beam.Pipeline() のコンストラクタで、InteractiveRunnerDataflowRunner に置き換えます。p は、パイプラインの作成で作成したパイプライン オブジェクトです。
p = beam.Pipeline(DataflowRunner(), options=options)
  1. インタラクティブな呼び出しをコードから削除します。たとえば、show()collect()head()show_graph()、および watch() をコードから削除します。
  2. 結果を表示するには、シンクを追加する必要があります。前のセクションでは、ノートブックで結果を可視化しましたが、今回は、このノートブックの外側(Dataflow)でジョブを実行します。そのため、結果を保存する外部の場所が必要です。この例では、結果を GCS(Google Cloud Storage)のテキスト ファイルに書き込みます。これはストリーミング パイプラインであるため、データ ウィンドウ処理を使用して、ウィンドウごとに 1 つのテキスト ファイルを作成します。これを行うには、パイプラインに次の手順を追加します。
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. パイプライン コードの末尾に p.run() を追加します。
  2. ノートブック コードを確認して、すべての変更が組み込まれていることを確認します。次のような設定になっているはずです。
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. セルを実行します。
  2. 次のような出力が表示されます。
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. ジョブが実行されているかどうかを確認するには、Dataflow の [ジョブ] ページに移動します。リストに新しいジョブが表示されます。ジョブがデータの処理を開始するまで約 5 ~ 10 分かかります。
  2. データの処理が開始されたら、Cloud Storage に移動し、Dataflow が結果を保存しているディレクトリ(定義した output_gcs_location)に移動します。テキスト ファイルのリストが表示され、ウィンドウごとに 1 つのファイルが表示されます。bfcc5ce9e46a8b14.png
  3. ファイルをダウンロードして内容を確認します。このファイルには、単語とそのカウントのペアのリストが含まれている必要があります。または、コマンドライン インターフェースを使用してファイルを検査します。これを行うには、ノートブックの新しいセルで次のコマンドを実行します。
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. 次のような出力が表示されます。

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. これで、作成したジョブをクリーンアップして停止することを忘れないでください(この Codelab の最後のステップを参照)。

インタラクティブ ノートブックでこの変換を実行する方法の例については、ノートブック インスタンスの Dataflow Word Count ノートブックを参照してください。

あるいは、ノートブックを実行可能スクリプトとしてエクスポートし、生成された .py ファイルを前の手順で変更してから、Dataflow サービスにパイプラインをデプロイすることもできます。

7. ノートブックの保存

作成したノートブックは、実行中のノートブック インスタンスにローカルに保存されます。開発中にノートブック インスタンスをリセットまたはシャットダウンした場合、これらの新しいノートブックは、/home/jupyter ディレクトリの下に作成される限り保持されます。ただし、ノートブック インスタンスが削除されると、それらのノートブックも削除されます。

ノートブックを後で使用できるようにするには、ワークステーションにローカルにダウンロードするか、GitHub に保存するか、別のファイル形式にエクスポートします。

8. クリーンアップ

Apache Beam ノートブック インスタンスの使用が終了したら、ノートブック インスタンスをシャットダウンし、ストリーミング ジョブを実行した場合は ストリーミング ジョブを停止して、Google Cloud で作成したリソースをクリーンアップします。

また、この Codelab 専用のプロジェクトを作成した場合は、プロジェクトを完全にシャットダウンすることもできます。