Cloud Dataflow でビッグデータ テキスト処理パイプラインを実行する

1. 概要

Cloud-Dataflow.png

Dataflow とは

Dataflow は、さまざまなデータ処理パターンの実行に対応したマネージド サービスです。このサイトのドキュメントでは、Dataflow を使用してバッチおよびストリーミングのデータ処理パイプラインをデプロイする方法と、サービス機能の使用方法について説明しています。

Apache Beam SDK はオープンソースのプログラミング モデルであり、バッチとストリーミングの両方のパイプラインの開発に使用できます。Apache Beam プログラムでパイプラインを作成し、Dataflow サービスで実行します。Apache Beam のドキュメントには、詳細なコンセプト情報と Apache Beam のプログラミング モデル、SDK、その他のランナーのリファレンス情報が記載されています。

高速なストリーミング データ分析

Dataflow を使用すると、データ レイテンシを低く抑えながら、ストリーミング データ パイプラインを迅速かつ簡単に開発できます。

運用と管理の簡素化

Dataflow のサーバーレス アプローチにより、データ エンジニアリングのワークロードから運用上のオーバーヘッドが取り除かれ、チームはサーバー クラスタの管理ではなくプログラミングに集中できます。

総所有コストを削減

リソースの自動スケーリングとコスト最適化されたバッチ処理機能を組み合わせることにより、Dataflow で実質無制限の容量を利用できます。過剰な費用をかけずに、時季変動したり急変動したりするワークロードを管理できます。

主な機能

自動化されたリソース管理と動的作業再調整

Dataflow は、処理リソースのプロビジョニングと管理を自動化してレイテンシを最小限に抑え、使用率を最大化します。そのため、手動でインスタンスをスピンアップしたり予約したりする必要はありません。作業のパーティショニングも自動化、最適化され、遅れている作業を動的に再調整します。「ホットキー」を追跡する必要がない入力データを前処理することもできます。

水平自動スケーリング

ワーカー リソースの水平自動スケーリングによってスループットが最適化されるため、全体的なコスト パフォーマンスが向上します。

バッチ処理向けの柔軟なリソース スケジューリング料金

夜間のジョブなど、ジョブのスケジュールに柔軟性を持たせた処理には、Flexible Resource Scheduling(FlexRS)の方が低価格でバッチ処理できます。このようなフレキシブル ジョブはキューに入り、6 時間以内に確実にキューから取り出されて実行されます。

このチュートリアルは、https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven を基に作成されています。

学習内容

  • Java SDK を使用して Apache Beam で Maven プロジェクトを作成する方法
  • Google Cloud Platform Console を使用してサンプル パイプラインを実行する
  • 関連する Cloud Storage バケットとその中身を削除する

必要なもの

このチュートリアルの利用方法をお選びください。

通読するのみ 通読し、演習を行う

Google Cloud Platform サービスのご利用経験についてどのように評価されますか?

<ph type="x-smartling-placeholder"></ph> 初心者 中級 上達 をご覧ください。

2. 設定と要件

セルフペース型の環境設定

  1. Cloud Console にログインし、新しいプロジェクトを作成するか、既存のプロジェクトを再利用します(Gmail アカウントまたは G Suite アカウントをお持ちでない場合は、アカウントを作成する必要があります)。

dMbN6g9RawQj_VXCSYpdYncY-DbaRzr2GbnwoV7jFf1u3avxJtmGPmKpMYgiaMH-qu80a_NJ9p2IIXFppYk8x3wyymZXavjglNLJJhuXieCem56H30hwXtd8PvXGpXJO9gEUDu3cZw

ci9Oe6PgnbNuSYlMyvbXF1JdQyiHoEgnhl4PlV_MFagm2ppzhueRkqX4eLjJllZco_2zCp0V0bpTupUSKji9KkQyWqj11pqit1K1faS1V6aFxLGQdkuzGp4rsQTan7F01iePL5DtqQ

8-tA_Lheyo8SscAVKrGii2coplQp2_D1Iosb2ViABY0UUO1A8cimXUu6Wf1R9zJIRExL5OB2j946aIiFtyKTzxDcNnuznmR45vZ2HMoK3o67jxuoUJCAnqvEX6NgPGFjCVNgASc-lg

プロジェクト ID を忘れないようにしてください。プロジェクト ID はすべての Google Cloud プロジェクトを通じて一意の名前にする必要があります(上記の名前はすでに使用されているので使用できません)。以降、このコードラボでは PROJECT_ID と呼びます。

  1. 次に、Google Cloud リソースを使用するために、Cloud Console で課金を有効にする必要があります。

このコードラボを実行しても、費用はほとんどかからないはずです。このチュートリアル以外で請求が発生しないように、リソースのシャットダウン方法を説明する「クリーンアップ」セクションの手順に従うようにしてください。Google Cloud の新規ユーザーは、300 米ドル分の無料トライアル プログラムをご利用いただけます。

API を有効にする

画面の左上にあるメニュー アイコンをクリックします。

2bfc27ef9ba2ec7d.png

[API とサービス >ダッシュボードを選択します。

5b65523a6cc0afa6.png

[+ API とサービスの有効化] を選択します。

81ed72192c0edd96.png

「Compute Engine」を検索します。」と入力します。[Compute Engine API] をクリックします。結果のリストに表示されます。

3f201e991c7b4527.png

[Google Compute Engine] ページで [有効にする] をクリックします。

ac121653277fa7bb.png

有効になったら、矢印をクリックして戻ります。

次の API を検索して有効にします。

  • Cloud Dataflow
  • Stackdriver
  • Cloud Storage
  • Cloud Storage JSON
  • BigQuery
  • Cloud Pub/Sub
  • Cloud Datastore
  • Cloud Resource Manager API

3. 新しい Cloud Storage バケットを作成する

Google Cloud Platform コンソールで、画面の左上にあるメニュー アイコンをクリックします。

2bfc27ef9ba2ec7d.png

下にスクロールして、[Cloud Storage >ブラウザの [ストレージ] サブセクション:

2b6c3a2a92b47015.png

Cloud Storage ブラウザが表示されます。現在 Cloud Storage バケットがないプロジェクトを使用している場合は、新しいバケットを作成するための招待が表示されます。[バケットを作成] ボタンをクリックしてバケットを作成します。

a711016d5a99dc37.png

バケットの名前を入力します。ダイアログ ボックスに示されているように、バケット名は Cloud Storage 全体で一意である必要があります。したがって、「test」などのわかりやすい名前を選択すると、他のユーザーがその名前でバケットをすでに作成していることがわかるので、エラーが返されます。

バケット名に使用できる文字に関するルールもあります。バケット名の先頭と末尾には英字または数字を使用し、中央にはダッシュのみを使用すれば問題ありません。特殊文字を使用したり、バケット名の先頭または末尾に文字や数字以外の文字を使用したりすると、ダイアログ ボックスでルールが表示されます。

3a5458648cfe3358.png

バケットの一意の名前を入力し、[作成] をクリックします。すでに使用されているものを選択すると、上記のエラー メッセージが表示されます。バケットが正常に作成されると、新しい空のバケットがブラウザに表示されます。

3bda986ae88c4e71.png

すべてのプロジェクトで一意である必要があるため、表示されるバケット名はもちろんこれとは異なります。

4. Cloud Shell を起動する

Cloud Shell をアクティブにする

  1. Cloud Console で、[Cloud Shell をアクティブにする] H7JlbhKGHITmsxhQIcLwoe5HXZMhDlYue4K-SPszMxUxDjIeWfOHBfxDHYpmLQTzUmQ7Xx8o6OJUlANnQF0iBuUyfp1RzVad_4nCa0Zz5LtwBlUZFXFCWFrmrWZLqg1MkZz2LdgUDQ をクリックします。

zlNW0HehB_AFW1qZ4AyebSQUdWm95n7TbnOr7UVm3j9dFcg6oWApJRlC0jnU1Mvb-IQp-trP1Px8xKNwt6o3pP6fyih947sEhOFI4IRF0W7WZk6hFqZDUGXQQXrw21GuMm2ecHrbzQ

Cloud Shell を起動したことがない場合、その内容を説明する中間画面が(スクロールしなければ見えない範囲に)が表示されます。その場合は、[続行] をクリックします(以後表示されなくなります)。このワンタイム スクリーンは次のようになります。

kEPbNAo_w5C_pi9QvhFwWwky1cX8hr_xEMGWySNIoMCdi-Djx9AQRqWn-__DmEpC7vKgUtl-feTcv-wBxJ8NwzzAp7mY65-fi2LJo4twUoewT1SUjd6Y3h81RG3rKIkqhoVlFR-G7w

Cloud Shell のプロビジョニングと接続に少し時間がかかる程度です。

pTv5mEKzWMWp5VBrg2eGcuRPv9dLInPToS-mohlrqDASyYGWnZ_SwE-MzOWHe76ZdCSmw0kgWogSJv27lrQE8pvA5OD6P1I47nz8vrAdK7yR1NseZKJvcxAZrPb8wRxoqyTpD-gbhA

この仮想マシンには、必要な開発ツールがすべて準備されています。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働するため、ネットワーク パフォーマンスが充実しており認証もスムーズです。このコードラボでの作業のほとんどは、ブラウザまたは Chromebook から実行できます。

Cloud Shell に接続すると、すでに認証は完了しており、プロジェクトに各自のプロジェクト ID が設定されていることがわかります。

  1. Cloud Shell で次のコマンドを実行して、認証されたことを確認します。
gcloud auth list

コマンド出力

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
gcloud config list project

コマンド出力

[core]
project = <PROJECT_ID>

上記のようになっていない場合は、次のコマンドで設定できます。

gcloud config set project <PROJECT_ID>

コマンド出力

Updated property [core/project].

5. Maven プロジェクトを作成する

Cloud Shell が起動したら、まず Apache Beam 用の Java SDK を使用して Maven プロジェクトを作成します。

Apache Beam は、データ パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでこれらのパイプラインを定義し、パイプラインを実行する Dataflow などのランナーを選択できます。

次のように、シェルで mvn archetype:generate コマンドを実行します。

  mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.46.0 \
     -DgroupId=org.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

このコマンドを実行すると、現在のディレクトリの下に first-dataflow という新しいディレクトリが表示されます。first-dataflow には、Cloud Dataflow SDK for Java とサンプル パイプラインを含む Maven プロジェクトが含まれています。

6. Cloud Dataflow でテキスト処理パイプラインを実行する

まず、プロジェクト ID と Cloud Storage バケット名を環境変数として保存します。環境変数にキーを保存するには、Cloud Shell で次のように入力します。<your_project_id> は実際のプロジェクト ID に置き換えてください。

 export PROJECT_ID=<your_project_id>

次に、Cloud Storage バケットについても同じことを行います。<your_bucket_name> は、前の手順でバケットの作成に使用した一意の名前に置き換えます。

 export BUCKET_NAME=<your_bucket_name>

first-dataflow/ ディレクトリに移動します。

 cd first-dataflow

次は、WordCount という名前のパイプラインを実行します。このパイプラインは、テキストを読み取り、テキスト行を個別の単語にトークン化したうえで、各単語の出現頻度をカウントします。まずパイプラインを実行します。実行中に、各ステップで何が起こっているかを確認します。

シェルまたはターミナル ウィンドウで mvn compile exec:java コマンドを実行して、パイプラインを開始します。以下のコマンドでは、--project, --stagingLocation, 引数と --output 引数について、この手順で前に設定した環境変数を参照します。

 mvn compile exec:java \
      -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=DataflowRunner \
      --region=us-central1 \
      --gcpTempLocation=gs://${BUCKET_NAME}/temp"

ジョブの実行中に、ジョブリストでそのジョブを見つけましょう。

Google Cloud Platform Console で Cloud Dataflow ウェブ UI を開きます。ワードカウント ジョブのステータスが [実行中] と表示されます。

3623be74922e3209.png

次に、パイプラインのパラメータを見てみましょう。最初に、ジョブの名前をクリックします。

816d8f59c72797d7.png

ジョブを選択すると、実行グラフを表示できます。パイプラインの実行グラフは、パイプライン内の各変換を、変換名といくつかのステータス情報を含むボックスとして表します。各ステップの右上隅にあるアイコンをクリックすると、詳細が表示されます。

80a972dd19a6f1eb.png

ここで、パイプラインの各ステップで行われるデータ変換について説明します。

  • 読み取り: このステップでは、入力ソースから読み取りを行います。この例では、シェイクスピア劇「リア王」のテキスト全体を含む Cloud Storage のテキスト ファイルです。このパイプラインはファイルを 1 行ずつ読み取り、それぞれ PCollection を出力します。ここで、テキスト ファイルの各行はコレクションの要素です。
  • CountWords: CountWords ステップは 2 つの部分で構成されます。まず、ExtractWords という名前の並列処理関数(ParDo)を使用して、各行を個々の単語にトークン化します。ExtractWords の出力は、各要素が単語である新しい PCollection です。次のステップ Count では、Java SDK に用意された変換を使用します。この変換は、一意の単語を一意の単語、値はその出現回数である、キーと値のペアを返します。以下に CountWords を実装するメソッドを示します。GitHub で WordCount.java ファイル全体を確認できます。
 /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }
  • MapElements: これにより、以下にコピーされる FormatAsTextFn が呼び出されます。これにより、キーと値の各ペアが印刷可能な文字列に整形されます。
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts: このステップでは、出力可能な文字列を複数のシャーディングされたテキスト ファイルに書き込みます。

パイプラインからの出力結果は後ほど確認します。

次に、グラフの右側にある [ジョブ情報] ページを見てみましょう。このページには、mvn compile exec:java コマンドに含めたパイプライン パラメータが含まれています。

9723815a1f5bf08b.png

208a7f0d6973acf6.png

また、パイプラインの [カスタム カウンタ] も表示されます。この場合は、実行中にこれまでに検出された空の行の数を示しています。アプリケーション固有の指標を追跡するために、パイプラインに新しいカウンタを追加できます。

a2e2800e2c6893f8.png

コンソールの下部にあるログアイコンをクリックすると、特定のエラー メッセージを表示できます。

23c64138a1027f8.png

このパネルには、デフォルトでジョブ全体のステータスを報告するジョブログ メッセージが表示されます。最小重要度セレクタを使用して、ジョブの進行状況とステータス メッセージをフィルタできます。

94ba42015fdafbe2.png

グラフでパイプライン ステップを選択すると、ビューは、コードによって生成されたログと、パイプライン ステップで実行されている生成されたコードに変更されます。

ジョブのログに戻るには、グラフの外側をクリックするか右側のパネルの閉じるボタンを使用して、ステップの選択を解除します。

[ログ] タブの [ワーカーログ] ボタンを使用して、パイプラインを実行する Compute Engine インスタンスのワーカーログを表示できます。ワーカーログには、コードで生成されるログ行と、それを実行している、Dataflow で生成されるコードが含まれます。

パイプラインの障害をデバッグする場合、問題の解決に役立つ追加のログが [ワーカーログ] に記録されることがあります。これらのログはすべてのワーカーから集計され、フィルタや検索が可能です。

5a53c244f28d5478.png

Dataflow モニタリング インターフェースには最新のログメッセージのみが表示されます。ログペインの右側にある Google Cloud Observability のリンクをクリックすると、すべてのログを表示できます。

2bc704a4d6529b31.png

[モニタリング] → [ログ] ページで表示できるさまざまなログタイプの概要は次のとおりです。

  • job-message ログには、Dataflow のさまざまなコンポーネントが生成するジョブレベルのメッセージが記録されます。たとえば、自動スケーリングの構成、ワーカーの起動またはシャットダウン、ジョブステップの進行状況、ジョブエラーなどです。ユーザーコードのクラッシュに起因し、worker ログに存在するワーカーレベルのエラーも job-message ログに伝播されます。
  • worker ログは Dataflow ワーカーによって生成されます。ワーカーは、パイプライン処理のほとんど(ParDo をデータに適用するなど)を行います。ワーカーログには、コードと Dataflow によって記録されたメッセージが含まれます。
  • worker-startup ログは、ほとんどの Dataflow ジョブに存在し、起動プロセスに関連するメッセージをキャプチャできます。起動プロセスでは、Cloud Storage からジョブの jar をダウンロードし、ワーカーを起動します。ワーカーの起動に問題がある場合は、これらのログを確認することをおすすめします。
  • shuffler ログには、並列パイプライン オペレーションの結果を統合するワーカーからのメッセージが含まれます。
  • dockerkubelet のログには、Dataflow ワーカーで使用される、これらの一般公開されたテクノロジーに関連するメッセージが含まれます。

次のステップでは、ジョブが正常に実行されたかどうかを確認します。

7. ジョブが成功したことを確認する

Google Cloud Platform Console で Cloud Dataflow ウェブ UI を開きます。

ワードカウント ジョブのステータスが最初に [実行中] と表示され、その後 [完了] が表示されます。

4c408162416d03a2.png

ジョブの実行が完了するまでには約 3~4 分かかります。

パイプラインを実行したときに、出力バケットを指定しました。結果を見てみましょう(リア王の各単語が何回出現したかは関係ないため)。Google Cloud Platform コンソールで Cloud Storage ブラウザに戻ります。バケットに、ジョブによって作成された出力ファイルとステージング ファイルが表示されます。

25a5d3d4b5d0b567.png

8. リソースをシャットダウンする

リソースは、Google Cloud Platform Console からシャットダウンできます。

Google Cloud Platform Console で Cloud Storage ブラウザを開きます。

2b6c3a2a92b47015.png

作成したバケットの横にあるチェックボックスをオンにして、[削除] をクリックして、バケットとその内容を完全に削除します。

2f7780bdf10b69ba.png

8051ef293a8e5cfe.png

9. 完了

このラボでは、Cloud Dataflow SDK を含んだ Maven プロジェクトを作成し、Google Cloud Platform Console を使用してサンプル パイプラインを実行しました。また、関連する Cloud Storage バケットとその中身を削除する方法についても学習しました。

詳細

ライセンス

このソフトウェアは、クリエイティブ・コモンズの表示 3.0 汎用ライセンス、および Apache 2.0 ライセンスにより使用許諾されています。