1. 概要

Dataflow とは
Dataflow は、さまざまなデータ処理パターンの実行に対応したマネージド サービスです。このサイトのドキュメントでは、Dataflow を使用してバッチおよびストリーミングのデータ処理パイプラインをデプロイする方法とサービス機能の使用方法を説明します。
Apache Beam SDK は、バッチとストリーミングの両方のパイプラインの開発に対応したオープンソースのプログラミング モデルです。Apache Beam プログラムでパイプラインを作成し、Dataflow サービスで実行します。Apache Beam のドキュメントには、詳細なコンセプト情報と Apache Beam のプログラミング モデル、SDK、他のランナーのリファレンス情報が記載されています。
高速なストリーミング データ分析
Dataflow を使用すると、データ転送のレイテンシを抑えた、高速で簡素化されたストリーミング データ パイプライン開発が可能になります。
運用と管理を簡素化する
Dataflow のサーバーレス アプローチにより、データ エンジニアリングのワークロードから運用上のオーバーヘッドが取り除かれるため、チームはサーバー クラスタの管理ではなく、プログラミングに専念できます。
総所有コストを削減する
リソースの自動スケーリングとコスト最適化されたバッチ処理機能を組み合わせることにより、Dataflow で実質無制限の容量を利用できます。過剰な費用をかけずに、時季変動したり急変動したりするワークロードを管理できます。
主な機能
自動化されたリソース管理と動的作業再調整
Dataflow は、処理リソースのプロビジョニングと管理を自動化することで、レイテンシを最小限に抑え、使用率を高いレベルで維持します。そのため、手動で追加のインスタンスを起動したり、予約する必要がなくなります。作業配分も自動化、最適化されて、遅れている作業は動的に再調整されます。もうホットキーを追跡したり、入力データを事前に処理する必要はありません。
水平自動スケーリング
ワーカー リソースの水平自動スケーリング機能によりスループットが最適化され、処理性能に対する費用対効果が全体的に向上します。
バッチ処理に適したフレキシブル リソース スケジューリング料金
深夜のジョブなど、スケジュールに柔軟性があるジョブの処理には、フレキシブル リソース スケジューリング(FlexRS)をお使いください。低料金のバッチ処理が可能です。このようなフレキシブル ジョブはキューに入り、6 時間以内に確実にキューから取り出されて実行されます。
このチュートリアルは、https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven を基に作成されています。
学習内容
- Java SDK を使用して Apache Beam で Maven プロジェクトを作成する方法
- Google Cloud Platform Console を使用してサンプル パイプラインを実行する
- 関連する Cloud Storage バケットとその中身を削除する
必要なもの
このチュートリアルの利用方法をお選びください。
Google Cloud Platform サービスのご利用経験についてどのように評価されますか?
2. 設定と要件
セルフペース型の環境設定
- Cloud Console にログインし、新しいプロジェクトを作成するか、既存のプロジェクトを再利用します(Gmail アカウントまたは G Suite アカウントをお持ちでない場合は、アカウントを作成する必要があります)。
プロジェクト ID を忘れないようにしてください。プロジェクト ID はすべての Google Cloud プロジェクトを通じて一意の名前にする必要があります(上記の名前はすでに使用されているので使用できません)。以降、このコードラボでは PROJECT_ID と呼びます。
- 次に、Google Cloud リソースを使用するために、Cloud Console で課金を有効にする必要があります。
このコードラボを実行しても、費用はほとんどかからないはずです。このチュートリアル以外で請求が発生しないように、リソースのシャットダウン方法を説明する「クリーンアップ」セクションの手順に従うようにしてください。Google Cloud の新規ユーザーは、300 米ドル分の無料トライアル プログラムをご利用いただけます。
API を有効にする
画面の左上にあるメニュー アイコンをクリックします。

プルダウンから [API とサービス > ダッシュボード] を選択します。

[API とサービスを有効にする] を選択します。

検索ボックスで「Compute Engine」を検索します。表示された検索結果のリストで、[Compute Engine API] をクリックします。

[Google Compute Engine] ページで、[有効にする] をクリックします。

有効になったら、矢印をクリックして戻ります。
次の API を検索して有効にします。
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- Cloud Storage JSON
- BigQuery
- Cloud Pub/Sub
- Cloud Datastore
- Cloud Resource Manager API
3. 新しい Cloud Storage バケットを作成する
Google Cloud Platform Console で、画面左上のメニュー アイコンをクリックします。

下にスクロールして、[ストレージ] サブセクションで [Cloud Storage] > [ブラウザ] を選択します。

Cloud Storage ブラウザが表示されます。現在 Cloud Storage バケットがないプロジェクトを使用している場合は、新しいバケットを作成するよう求めるメッセージが表示されます。[バケットを作成] ボタンをクリックして作成します。

バケットの名前を入力します。ダイアログ ボックスに記載されているように、バケット名は Cloud Storage 全体で一意である必要があります。そのため、「test」のようなわかりやすい名前を選択すると、その名前のバケットがすでに他のユーザーによって作成されている可能性が高く、エラーが表示されます。
バケット名に使用できる文字に関するルールもあります。バケット名の先頭と末尾を英字または数字にし、中間にはダッシュのみを使用すれば問題ありません。特殊文字を使用しようとした場合や、バケット名の先頭または末尾に文字または数字以外の文字を使用しようとした場合は、ダイアログ ボックスにルールが表示されます。

バケットの一意の名前を入力して、[作成] をクリックします。すでに使用されているものを選択すると、上記のエラー メッセージが表示されます。バケットが正常に作成されると、ブラウザで新しい空のバケットが表示されます。

バケット名はすべてのプロジェクトで一意である必要があるため、表示されるバケット名は異なります。
4. Cloud Shell を起動する
Cloud Shell をアクティブにする
- Cloud Console で、[Cloud Shell をアクティブにする]
をクリックします。
Cloud Shell を起動したことがない場合、その内容を説明する中間画面が(スクロールしなければ見えない範囲に)が表示されます。その場合は、[続行] をクリックします(以後表示されなくなります)。このワンタイム スクリーンは次のようになります。
Cloud Shell のプロビジョニングと接続に少し時間がかかる程度です。
この仮想マシンには、必要な開発ツールがすべて準備されています。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働するため、ネットワーク パフォーマンスが充実しており認証もスムーズです。このコードラボでの作業のほとんどは、ブラウザまたは Chromebook から実行できます。
Cloud Shell に接続すると、すでに認証は完了しており、プロジェクトに各自のプロジェクト ID が設定されていることがわかります。
- Cloud Shell で次のコマンドを実行して、認証されたことを確認します。
gcloud auth list
コマンド出力
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
gcloud config list project
コマンド出力
[core] project = <PROJECT_ID>
上記のようになっていない場合は、次のコマンドで設定できます。
gcloud config set project <PROJECT_ID>
コマンド出力
Updated property [core/project].
5. Maven プロジェクトを作成する
Cloud Shell が起動したら、Apache Beam 用 Java SDK を使用して Maven プロジェクトを作成します。
Apache Beam は、データ パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでこのようなパイプラインを定義し、パイプラインを実行する Dataflow などのランナーを選択できます。
シェルで次のように mvn archetype:generate コマンドを実行します。
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
コマンドを実行すると、現在のディレクトリの下に first-dataflow という新しいディレクトリが表示されます。first-dataflow には、Cloud Dataflow SDK for Java とサンプル パイプラインを含む Maven プロジェクトが含まれています。
6. Cloud Dataflow でテキスト処理パイプラインを実行する
まず、プロジェクト ID と Cloud Storage バケット名を環境変数として保存します。環境変数にキーを保存するには、Cloud Shell で次のように入力します。<your_project_id> は、実際のプロジェクト ID に置き換えてください。
export PROJECT_ID=<your_project_id>
次に、Cloud Storage バケットについても同じことを行います。<your_bucket_name> は、前の手順でバケットの作成に使用した一意の名前に置き換えます。
export BUCKET_NAME=<your_bucket_name>
first-dataflow/ ディレクトリに移動します。
cd first-dataflow
次は、WordCount という名前のパイプラインを実行します。このパイプラインは、テキストを読み取り、テキスト行を個別の単語にトークン化したうえで、各単語の出現頻度をカウントします。まずパイプラインを実行し、実行中に各ステップで何が行われているかを確認します。
シェルまたはターミナル ウィンドウで mvn compile exec:java コマンドを実行して、パイプラインを開始します。下のコマンドで指定している --project, --stagingLocation, と --output の各引数では、この手順の前半で設定した環境変数が参照されます。
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
ジョブが実行されている間に、ジョブリストでジョブを探します。
Google Cloud Platform Console で Cloud Dataflow ウェブ UI を開きます。wordcount ジョブが [実行中] のステータスで表示されます。

次に、パイプライン パラメータを見てみましょう。最初に、ジョブの名前をクリックします。

ジョブを選択すると、実行グラフが表示されます。パイプラインの実行グラフは、パイプライン内の各変換を、変換名といくつかのステータス情報を含むボックスとして表します。各ステップの右上隅にあるアイコンをクリックすると、詳細が表示されます。

ここで、パイプラインの各ステップで行われるデータ変換について説明します。
- 読み取り: このステップでは、パイプラインは入力ソースから読み取ります。今回の入力ソースは、シェイクスピアの演劇、『リア王』のテキストがすべて含まれた Cloud Storage のテキスト ファイルです。パイプラインはファイルを 1 行ずつ読み取り、それぞれ
PCollectionを出力します。テキスト ファイルの各行はコレクションの要素です。 - CountWords:
CountWordsステップは 2 つのパートに分かれています。まず、ExtractWordsという名前の並列 do 関数(ParDo)を使用して、各行を個々の単語にトークン化します。ExtractWords の出力は、各要素が単語である新しい PCollection です。次のステップCountでは、Java SDK によって提供される変換を利用します。この変換は、キーが一意の単語で、値がその単語の出現回数である Key-Value ペアを返します。CountWordsを実装するメソッドは次のとおりです。WordCount.java ファイル全体は GitHub で確認できます。
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements: これは、以下にコピーした
FormatAsTextFnを呼び出し、各 Key-Value ペアを出力可能な文字列に変換します。
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: このステップでは、出力可能な文字列を分割された複数のテキスト ファイルに書き込みます。
パイプラインからの出力結果は後ほど確認します。
次は、グラフの右側にある [ジョブ情報] ページを確認します。ここには、mvn compile exec:java コマンドに指定したパイプライン パラメータが表示されます。


パイプラインの [カスタム カウンタ] も表示されます。今回の場合、このカウンタはこれまでの処理中に検出された空の行の数を示しています。アプリケーション固有の指標を追跡するために、パイプラインに新しいカウンタを追加できます。

コンソールの下部にある [ログ] アイコンをクリックすると、特定のエラー メッセージを表示できます。

パネルにはデフォルトで、ジョブ全体のステータスを報告するジョブログ メッセージが表示されます。最小重要度セレクタを使用して、ジョブの処理状況とステータス メッセージをフィルタできます。

グラフのパイプライン ステップを選択すると、ビューは、コードによって生成されたログと、パイプライン ステップで実行中の生成されたコードに変更されます。
[ジョブのログ] に戻るには、グラフの外側をクリックするか右側のパネルの [閉じる] ボタンを使用して、ステップの選択を解除します。
ログタブの [ワーカーログ] ボタンを使用すると、パイプラインを実行する Compute Engine インスタンスのワーカーログを表示できます。ワーカーログには、コードで生成されるログ行と、それを実行している、Dataflow で生成されるコードが含まれます。
パイプラインの障害をデバッグしようとしている場合は、ワーカーログに追加のロギングが記録されていることが多く、問題の解決に役立ちます。これらのログはすべてのワーカーで集計され、フィルタと検索が可能です。

Dataflow Monitoring Interface には、最新のログメッセージのみが表示されます。ログペインの右側にある Google Cloud Observability リンクをクリックすると、すべてのログを表示できます。

ここでは、[モニタリング] → [ログ] ページで表示できるさまざまなログタイプの概要を示します。
- job-message ログには、Dataflow のさまざまなコンポーネントが生成するジョブレベルのメッセージが記録されます。たとえば、自動スケーリング構成、ワーカーの起動やシャットダウン、ジョブステップの進捗状況、ジョブエラーなどが記録されます。ユーザーコードのクラッシュに起因するワーカーレベルのエラーや、ワーカーログに記録されたエラーも job-message ログに記録されます。
- worker ログは Dataflow ワーカーによって生成されます。ワーカーは、パイプライン処理の大部分(データへの ParDo の適用など)を実行します。worker のログには、コードと Dataflow によって記録されたメッセージが含まれます。
- worker-startup ログは、ほとんどの Dataflow ジョブを表し、起動プロセスに関連するメッセージをキャプチャできます。起動プロセスには、Cloud Storage からのジョブの jar のダウンロードと、それに続くワーカーの起動が含まれます。ワーカーの起動に問題がある場合は、ここを調べることをおすすめします。
- shuffler ログには、並列パイプライン オペレーションの結果を統合する、ワーカーからのメッセージが含まれます。
- docker と kubelet のログには、Dataflow ワーカーで使用される、これらの一般公開されたテクノロジーに関連するメッセージが含まれます。
次のステップでは、ジョブが正常に実行されたことを確認します。
7. ジョブが正常に実行されたことを確認する
Google Cloud Platform Console で Cloud Dataflow ウェブ UI を開きます。
wordcount ジョブは、まずステータスが [実行中] と表示され、その後 [完了] に変わります。

ジョブの実行が完了するまでには約 3~4 分かかります。
パイプラインを実行したときに、出力バケットを指定しました。『リア王』に含まれる各単語の出現回数がどのようになっているのか、結果を見てみましょう。Google Cloud Platform Console で Cloud Storage ブラウザに戻ります。バケットに、ジョブによって作成された出力ファイルとステージング ファイルが表示されます。

8. リソースをシャットダウンする
リソースは Google Cloud Platform Console からシャットダウンできます。
Google Cloud Platform Console で Cloud Storage ブラウザを開きます。

作成したバケットの横にあるチェックボックスをオンにして、[削除] をクリックして、バケットとそのコンテンツを完全に削除します。


9. 完了
このラボでは、Cloud Dataflow SDK を含んだ Maven プロジェクトを作成し、Google Cloud Platform Console を使用してサンプル パイプラインを実行しました。また、関連する Cloud Storage バケットとその中身を削除する方法についても学習しました。
詳細
- Dataflow のドキュメント: https://cloud.google.com/dataflow/docs/
ライセンス
この作品は、クリエイティブ・コモンズの表示 3.0 汎用ライセンスと Apache 2.0 ライセンスにより使用許諾されています。