1. 概要
Dataflow とは
Dataflow は、さまざまなデータ処理パターンの実行に対応したマネージド サービスです。このサイトのドキュメントでは、Dataflow を使用してバッチおよびストリーミングのデータ処理パイプラインをデプロイする方法と、サービス機能の使用方法について説明しています。
Apache Beam SDK はオープンソースのプログラミング モデルであり、バッチとストリーミングの両方のパイプラインの開発に使用できます。Apache Beam プログラムでパイプラインを作成し、Dataflow サービスで実行します。Apache Beam のドキュメントには、詳細なコンセプト情報と Apache Beam のプログラミング モデル、SDK、その他のランナーのリファレンス情報が記載されています。
高速なストリーミング データ分析
Dataflow を使用すると、データ レイテンシを低く抑えながら、ストリーミング データ パイプラインを迅速かつ簡単に開発できます。
運用と管理の簡素化
Dataflow のサーバーレス アプローチにより、データ エンジニアリングのワークロードから運用上のオーバーヘッドが取り除かれ、チームはサーバー クラスタの管理ではなくプログラミングに集中できます。
総所有コストを削減
リソースの自動スケーリングとコスト最適化されたバッチ処理機能を組み合わせることにより、Dataflow で実質無制限の容量を利用できます。過剰な費用をかけずに、時季変動したり急変動したりするワークロードを管理できます。
主な機能
自動化されたリソース管理と動的作業再調整
Dataflow は、処理リソースのプロビジョニングと管理を自動化してレイテンシを最小限に抑え、使用率を最大化します。そのため、手動でインスタンスをスピンアップしたり予約したりする必要はありません。作業のパーティショニングも自動化、最適化され、遅れている作業を動的に再調整します。「ホットキー」を追跡する必要がない入力データを前処理することもできます。
水平自動スケーリング
ワーカー リソースの水平自動スケーリングによってスループットが最適化されるため、全体的なコスト パフォーマンスが向上します。
バッチ処理向けの柔軟なリソース スケジューリング料金
夜間のジョブなど、ジョブのスケジュールに柔軟性を持たせた処理には、Flexible Resource Scheduling(FlexRS)の方が低価格でバッチ処理できます。このようなフレキシブル ジョブはキューに入り、6 時間以内に確実にキューから取り出されて実行されます。
このチュートリアルは、https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven を基に作成されています。
学習内容
- Java SDK を使用して Apache Beam で Maven プロジェクトを作成する方法
- Google Cloud Platform Console を使用してサンプル パイプラインを実行する
- 関連する Cloud Storage バケットとその中身を削除する
必要なもの
このチュートリアルの利用方法をお選びください。
Google Cloud Platform サービスのご利用経験についてどのように評価されますか?
<ph type="x-smartling-placeholder">2. 設定と要件
セルフペース型の環境設定
- Cloud Console にログインし、新しいプロジェクトを作成するか、既存のプロジェクトを再利用します(Gmail アカウントまたは G Suite アカウントをお持ちでない場合は、アカウントを作成する必要があります)。
プロジェクト ID を忘れないようにしてください。プロジェクト ID はすべての Google Cloud プロジェクトを通じて一意の名前にする必要があります(上記の名前はすでに使用されているので使用できません)。以降、このコードラボでは PROJECT_ID
と呼びます。
- 次に、Google Cloud リソースを使用するために、Cloud Console で課金を有効にする必要があります。
このコードラボを実行しても、費用はほとんどかからないはずです。このチュートリアル以外で請求が発生しないように、リソースのシャットダウン方法を説明する「クリーンアップ」セクションの手順に従うようにしてください。Google Cloud の新規ユーザーは、300 米ドル分の無料トライアル プログラムをご利用いただけます。
API を有効にする
画面の左上にあるメニュー アイコンをクリックします。
[API とサービス >ダッシュボードを選択します。
[+ API とサービスの有効化] を選択します。
「Compute Engine」を検索します。」と入力します。[Compute Engine API] をクリックします。結果のリストに表示されます。
[Google Compute Engine] ページで [有効にする] をクリックします。
有効になったら、矢印をクリックして戻ります。
次の API を検索して有効にします。
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- Cloud Storage JSON
- BigQuery
- Cloud Pub/Sub
- Cloud Datastore
- Cloud Resource Manager API
3. 新しい Cloud Storage バケットを作成する
Google Cloud Platform コンソールで、画面の左上にあるメニュー アイコンをクリックします。
下にスクロールして、[Cloud Storage >ブラウザの [ストレージ] サブセクション:
Cloud Storage ブラウザが表示されます。現在 Cloud Storage バケットがないプロジェクトを使用している場合は、新しいバケットを作成するための招待が表示されます。[バケットを作成] ボタンをクリックしてバケットを作成します。
バケットの名前を入力します。ダイアログ ボックスに示されているように、バケット名は Cloud Storage 全体で一意である必要があります。したがって、「test」などのわかりやすい名前を選択すると、他のユーザーがその名前でバケットをすでに作成していることがわかるので、エラーが返されます。
バケット名に使用できる文字に関するルールもあります。バケット名の先頭と末尾には英字または数字を使用し、中央にはダッシュのみを使用すれば問題ありません。特殊文字を使用したり、バケット名の先頭または末尾に文字や数字以外の文字を使用したりすると、ダイアログ ボックスでルールが表示されます。
バケットの一意の名前を入力し、[作成] をクリックします。すでに使用されているものを選択すると、上記のエラー メッセージが表示されます。バケットが正常に作成されると、新しい空のバケットがブラウザに表示されます。
すべてのプロジェクトで一意である必要があるため、表示されるバケット名はもちろんこれとは異なります。
4. Cloud Shell を起動する
Cloud Shell をアクティブにする
- Cloud Console で、[Cloud Shell をアクティブにする] をクリックします。
Cloud Shell を起動したことがない場合、その内容を説明する中間画面が(スクロールしなければ見えない範囲に)が表示されます。その場合は、[続行] をクリックします(以後表示されなくなります)。このワンタイム スクリーンは次のようになります。
Cloud Shell のプロビジョニングと接続に少し時間がかかる程度です。
この仮想マシンには、必要な開発ツールがすべて準備されています。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働するため、ネットワーク パフォーマンスが充実しており認証もスムーズです。このコードラボでの作業のほとんどは、ブラウザまたは Chromebook から実行できます。
Cloud Shell に接続すると、すでに認証は完了しており、プロジェクトに各自のプロジェクト ID が設定されていることがわかります。
- Cloud Shell で次のコマンドを実行して、認証されたことを確認します。
gcloud auth list
コマンド出力
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
gcloud config list project
コマンド出力
[core] project = <PROJECT_ID>
上記のようになっていない場合は、次のコマンドで設定できます。
gcloud config set project <PROJECT_ID>
コマンド出力
Updated property [core/project].
5. Maven プロジェクトを作成する
Cloud Shell が起動したら、まず Apache Beam 用の Java SDK を使用して Maven プロジェクトを作成します。
Apache Beam は、データ パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでこれらのパイプラインを定義し、パイプラインを実行する Dataflow などのランナーを選択できます。
次のように、シェルで mvn archetype:generate
コマンドを実行します。
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
このコマンドを実行すると、現在のディレクトリの下に first-dataflow
という新しいディレクトリが表示されます。first-dataflow
には、Cloud Dataflow SDK for Java とサンプル パイプラインを含む Maven プロジェクトが含まれています。
6. Cloud Dataflow でテキスト処理パイプラインを実行する
まず、プロジェクト ID と Cloud Storage バケット名を環境変数として保存します。環境変数にキーを保存するには、Cloud Shell で次のように入力します。<your_project_id>
は実際のプロジェクト ID に置き換えてください。
export PROJECT_ID=<your_project_id>
次に、Cloud Storage バケットについても同じことを行います。<your_bucket_name>
は、前の手順でバケットの作成に使用した一意の名前に置き換えます。
export BUCKET_NAME=<your_bucket_name>
first-dataflow/
ディレクトリに移動します。
cd first-dataflow
次は、WordCount という名前のパイプラインを実行します。このパイプラインは、テキストを読み取り、テキスト行を個別の単語にトークン化したうえで、各単語の出現頻度をカウントします。まずパイプラインを実行します。実行中に、各ステップで何が起こっているかを確認します。
シェルまたはターミナル ウィンドウで mvn compile exec:java
コマンドを実行して、パイプラインを開始します。以下のコマンドでは、--project, --stagingLocation,
引数と --output
引数について、この手順で前に設定した環境変数を参照します。
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
ジョブの実行中に、ジョブリストでそのジョブを見つけましょう。
Google Cloud Platform Console で Cloud Dataflow ウェブ UI を開きます。ワードカウント ジョブのステータスが [実行中] と表示されます。
次に、パイプラインのパラメータを見てみましょう。最初に、ジョブの名前をクリックします。
ジョブを選択すると、実行グラフを表示できます。パイプラインの実行グラフは、パイプライン内の各変換を、変換名といくつかのステータス情報を含むボックスとして表します。各ステップの右上隅にあるアイコンをクリックすると、詳細が表示されます。
ここで、パイプラインの各ステップで行われるデータ変換について説明します。
- 読み取り: このステップでは、入力ソースから読み取りを行います。この例では、シェイクスピア劇「リア王」のテキスト全体を含む Cloud Storage のテキスト ファイルです。このパイプラインはファイルを 1 行ずつ読み取り、それぞれ
PCollection
を出力します。ここで、テキスト ファイルの各行はコレクションの要素です。 - CountWords:
CountWords
ステップは 2 つの部分で構成されます。まず、ExtractWords
という名前の並列処理関数(ParDo)を使用して、各行を個々の単語にトークン化します。ExtractWords の出力は、各要素が単語である新しい PCollection です。次のステップCount
では、Java SDK に用意された変換を使用します。この変換は、一意の単語を一意の単語、値はその出現回数である、キーと値のペアを返します。以下にCountWords
を実装するメソッドを示します。GitHub で WordCount.java ファイル全体を確認できます。
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements: これにより、以下にコピーされる
FormatAsTextFn
が呼び出されます。これにより、キーと値の各ペアが印刷可能な文字列に整形されます。
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: このステップでは、出力可能な文字列を複数のシャーディングされたテキスト ファイルに書き込みます。
パイプラインからの出力結果は後ほど確認します。
次に、グラフの右側にある [ジョブ情報] ページを見てみましょう。このページには、mvn compile exec:java
コマンドに含めたパイプライン パラメータが含まれています。
また、パイプラインの [カスタム カウンタ] も表示されます。この場合は、実行中にこれまでに検出された空の行の数を示しています。アプリケーション固有の指標を追跡するために、パイプラインに新しいカウンタを追加できます。
コンソールの下部にあるログアイコンをクリックすると、特定のエラー メッセージを表示できます。
このパネルには、デフォルトでジョブ全体のステータスを報告するジョブログ メッセージが表示されます。最小重要度セレクタを使用して、ジョブの進行状況とステータス メッセージをフィルタできます。
グラフでパイプライン ステップを選択すると、ビューは、コードによって生成されたログと、パイプライン ステップで実行されている生成されたコードに変更されます。
ジョブのログに戻るには、グラフの外側をクリックするか右側のパネルの閉じるボタンを使用して、ステップの選択を解除します。
[ログ] タブの [ワーカーログ] ボタンを使用して、パイプラインを実行する Compute Engine インスタンスのワーカーログを表示できます。ワーカーログには、コードで生成されるログ行と、それを実行している、Dataflow で生成されるコードが含まれます。
パイプラインの障害をデバッグする場合、問題の解決に役立つ追加のログが [ワーカーログ] に記録されることがあります。これらのログはすべてのワーカーから集計され、フィルタや検索が可能です。
Dataflow モニタリング インターフェースには最新のログメッセージのみが表示されます。ログペインの右側にある Google Cloud Observability のリンクをクリックすると、すべてのログを表示できます。
[モニタリング] → [ログ] ページで表示できるさまざまなログタイプの概要は次のとおりです。
- job-message ログには、Dataflow のさまざまなコンポーネントが生成するジョブレベルのメッセージが記録されます。たとえば、自動スケーリングの構成、ワーカーの起動またはシャットダウン、ジョブステップの進行状況、ジョブエラーなどです。ユーザーコードのクラッシュに起因し、worker ログに存在するワーカーレベルのエラーも job-message ログに伝播されます。
- worker ログは Dataflow ワーカーによって生成されます。ワーカーは、パイプライン処理のほとんど(ParDo をデータに適用するなど)を行います。ワーカーログには、コードと Dataflow によって記録されたメッセージが含まれます。
- worker-startup ログは、ほとんどの Dataflow ジョブに存在し、起動プロセスに関連するメッセージをキャプチャできます。起動プロセスでは、Cloud Storage からジョブの jar をダウンロードし、ワーカーを起動します。ワーカーの起動に問題がある場合は、これらのログを確認することをおすすめします。
- shuffler ログには、並列パイプライン オペレーションの結果を統合するワーカーからのメッセージが含まれます。
- docker と kubelet のログには、Dataflow ワーカーで使用される、これらの一般公開されたテクノロジーに関連するメッセージが含まれます。
次のステップでは、ジョブが正常に実行されたかどうかを確認します。
7. ジョブが成功したことを確認する
Google Cloud Platform Console で Cloud Dataflow ウェブ UI を開きます。
ワードカウント ジョブのステータスが最初に [実行中] と表示され、その後 [完了] が表示されます。
ジョブの実行が完了するまでには約 3~4 分かかります。
パイプラインを実行したときに、出力バケットを指定しました。結果を見てみましょう(リア王の各単語が何回出現したかは関係ないため)。Google Cloud Platform コンソールで Cloud Storage ブラウザに戻ります。バケットに、ジョブによって作成された出力ファイルとステージング ファイルが表示されます。
8. リソースをシャットダウンする
リソースは、Google Cloud Platform Console からシャットダウンできます。
Google Cloud Platform Console で Cloud Storage ブラウザを開きます。
作成したバケットの横にあるチェックボックスをオンにして、[削除] をクリックして、バケットとその内容を完全に削除します。
9. 完了
このラボでは、Cloud Dataflow SDK を含んだ Maven プロジェクトを作成し、Google Cloud Platform Console を使用してサンプル パイプラインを実行しました。また、関連する Cloud Storage バケットとその中身を削除する方法についても学習しました。
詳細
- Dataflow のドキュメント: https://cloud.google.com/dataflow/docs/
ライセンス
このソフトウェアは、クリエイティブ・コモンズの表示 3.0 汎用ライセンス、および Apache 2.0 ライセンスにより使用許諾されています。