Dataflow は ETL やバッチ処理、継続的な計算処理などの幅広いデータ処理方法を開発、実行するための統合型プログラミング モデルのマネージド サービスです。マネージド サービスの Dataflow では必要に応じてリソースを割り当てることができ、レイテンシを最小限に抑えつつ、リソースの利用効率を高いレベルに維持できます。

Dataflow モデルはバッチ処理とストリーム処理のどちらにも対応するので、デベロッパーは正確性とコスト、処理時間を天秤にかけて何かに妥協する必要がなくなります。このコードラボでは、テキスト ファイル内の一意の単語の出現回数を数える Dataflow パイプラインの実行方法について学びます。

このチュートリアルはhttps://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven を基にしています。

学習すること

必要になるもの

チュートリアルの使用目的を教えてください

通読のみ 通読して、演習を行う

Google Cloud Platform のサービスの使用経験を教えてください

初心者 中級者 上級者

自分のペースで進める環境のセットアップ

If you don't already have a Google Account (Gmail or Google Apps), you must create one. Sign-in to Google Cloud Platform console (console.cloud.google.com) and create a new project:

Remember the project ID, a unique name across all Google Cloud projects (the name above has already been taken and will not work for you, sorry!). It will be referred to later in this codelab as PROJECT_ID.

Next, you'll need to enable billing in the Developers Console in order to use Google Cloud resources.

Running through this codelab shouldn't cost you more than a few dollars, but it could be more if you decide to use more resources or if you leave them running (see "cleanup" section at the end of this document). Google Container Engine pricing is documented here.

New users of Google Cloud Platform are eligible for a $300 free trial.

API を有効にする

画面の左上にあるメニューアイコンをクリックします。

ドロップダウンで [API Manager] を選択します。

を選択します。

検索ボックスで [Google Compute Engine] を検索し、表示された結果の中から [Google Compute Engine API] をクリックします。

[Google Compute Engine API] のページで [有効にする] をクリックします。

有効になったら、矢印をクリックして戻ります。

次に、以下の API も検索して、それぞれを有効にします。

Google Cloud Platform Console で、画面の左上にあるメニューアイコンをクリックします。

下にスクロールして、[ストレージ] サブセクションの [Cloud Storage] を選択します。

これで Cloud Storage のブラウザが表示されます。さらに、現在 Cloud Storage バケットのないプロジェクトを使用している場合は、新しいバケットの作成を勧めるダイアログボックスが表示されます。

[バケットを作成] ボタンを押して、バケットを作成します。

ご自分のバケットに付ける名前を入力します。ダイアログボックスに説明があるとおり、バケット名は Cloud Storage 内全体で一意でなければなりません。したがって、「test」のような安易な名前を付けようとすると、その名前が付けられたバケットがすでに別のユーザーによって作成されていることがあり、エラーが表示されます。

また、バケット名に使用できる文字にはルールがいくつかあります。バケット名の先頭と末尾には文字か数字を使用し、ダッシュ記号は先頭と末尾以外に使用してください。特殊文字を入力した場合や、バケット名の先頭と末尾に文字か数字以外を使用している場合は、ダイアログボックスにこれらのルールが表示されます。

一意のバケット名を入力し、[作成] を押します。すでに使用中のバケット名を入力すると、前述のエラー メッセージが表示されます。バケットの作成に成功すると、ブラウザに新しいバケットが表示されます。

当然ですが、ここに表示されるバケット名は独自の名前になります。これは、バケット名はすべてのプロジェクトを超えて一意でなければならないからです。

Google Cloud はラップトップからリモートで操作できるため、本コードラボでは、Cloud で動作するコマンドライン環境である Google Cloud Shell を使用します。この Debian ベース仮想マシンには、必要なすべての開発ツール (docker、gcloud、kubectl など) がロードされています。それは、永続的な 5GB ホームディレクトリを提供し、Google Cloud で動作し、ネットワーク性能と認証を大幅に改善します。そのため、本コードラボに必要なものは、ブラウザのみです (本コードラボは Chromebook でも動作します)

Google Cloud Shell をアクティブにするには、開発者コンソールで右上のボタンをクリックするだけです (プロビジョニングと環境への接続には数分しかかかりません)。

クラウド シェルに接続すると、すでに認証済みであり、プロジェクトがすでに PROJECT_ID に設定されていることが分かります。

$ gcloud auth list
認定済みアカウント:
 - <myaccount>@<mydomain>.com (アクティブ)
$ gcloud config list project
[core]
project = <PROJECT_ID>

何かしらの理由でプロジェクトが設定されていない場合、次のコマンドを発行してください。

$ gcloud config set project <PROJECT_ID>

PROJECT_ID が分かりませんか?セットアップステップでどの ID を使用したかを確認するか、コンソール ダッシュボードで検索してください。

Cloud Shell が起動したら、まずは Cloud Dataflow SDK for Java を含む Maven プロジェクトを作成してみましょう。

シェルで、以下の mvn archetype:generate コマンドを実行します。

  mvn archetype:generate \
     -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
     -DarchetypeGroupId=com.google.cloud.dataflow \
     -DarchetypeVersion=1.9.0 \
     -DgroupId=com.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -DinteractiveMode=false \
     -Dpackage=com.example

コマンドを実行すると、現在のディレクトリの下に first-dataflow という新しいディレクトリが表示されます。first-dataflow には、Cloud Dataflow SDK for Java とサンプルのパイプラインを含む Maven プロジェクトが含まれています。

まず、プロジェクト ID と Cloud Storage のバケット名を環境変数として保存します。キーの保存は Cloud Shell で行えます。以下の <your_project_id> をご自分のプロジェクト ID に置き換えてください。

 export PROJECT_ID=<your_project_id>

Cloud Storage のバケットにも同様のことを行います。この場合も <your_bucket_name> を、前の手順でバケットを作成するときに使用した一意の名前に置き換えてください。

 export BUCKET_NAME=<your_bucket_name>

first-dataflow/ ディレクトリに変えます。

 cd first-dataflow

ここからは、WordCount というパイプラインを実行します。このパイプラインは、テキストを読み取り、テキスト行を個別の単語にトークン化し、それらの単語それぞれについて頻度計算を実行します。最初にパイプラインを実行し、その実行中に各ステップで何が行われているのかについて見ていきます。

シェルまたはターミナル ウィンドウで mvn compile exec:java のコマンドを実行して、パイプラインを開始します。--project--stagingLocation、および --output の引数については、以下のコマンドが、この手順の前半でセットアップした環境変数を参照します。

 mvn compile exec:java \
      -Dexec.mainClass=com.example.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=BlockingDataflowPipelineRunner"

ジョブを実行している間に、ジョブ リストでジョブを見てみましょう。

Google Cloud Platform Console で Cloud Dataflow Monitoring UI を開きます。wordcount ジョブのステータスが [実行中] であることを確認してください。

それでは、パイプラインのパラメーターを見てみましょう。ジョブ名をクリックしてください。

ジョブを選択すると、実行グラフが表示されます。パイプラインの実行グラフは、パイプライン内の各変換を、変換名といくつかのステータス情報を含むボックスとして表しています。各ステップの右上にあるカラットをクリックすると、詳細が表示されます。

各ステップでパイプラインがどのようにデータを変換しているのか見てみましょう。

  /**
   * A PTransform that converts a PCollection containing lines of text 
   * into a PCollection of formatted word counts.
   */
  public static class CountWords extends PTransform<PCollection<String>,
      PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
          ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
          words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }

パイプラインから出力される結果については、数分後に確認します。

次に、グラフの右側の [概要] ページを見てみましょう。ここには、mvn compile exec:java のコマンドに含めたパイプラインのパラメーターが表示されています。

また、パイプラインの [カスタム カウンタ] も確認できます。この場合は、実行中の現時点までに見つかった空行の数が表示されています。 ここでは、アプリケーション固有のメトリクスを追跡するために、パイプラインに新しいカウンタを追加することもできます。

[ログ] アイコンをクリックすると、具体的なエラー メッセージを確認できます。

[ジョブログ] タブに表示されるメッセージをフィルタリングするには、[最小重要度] のドロップダウン メニューを使います。

ジョブのログを使うと、パイプラインを実行している Compute Engine インスタンスのワーカーログを表示できます。ワーカーログは、ご自分のコードと それを実行している Dataflow が生成したコードによって生成されたログ行で構成されています。

パイプラインで発生した問題のデバッグを試みている場合、その問題の解決に役立つログがワーカーログ内に追加で記録されることがよくあります。 これらのログは、すべてのワーカーを対象に集計されているもので、フィルタリングや検索を行えることを覚えておいてください。

次のステップでは、ジョブが成功したか確認します。

Google Cloud Platform Console で Cloud Dataflow Monitoring UI を開きます。

wordcount ジョブのステータスが、[実行中] から [成功] に変わるのがわかります。

ジョブの実行には、およそ 3 ~ 4 分かかります。

パイプラインを実行して出力バケットを指定したのはいつだったか、覚えていますか?ここで、「リア王」にどのような単語が何回出現したのか、結果を見てみましょう。Google Cloud Platform Console の Cloud Storage ブラウザに戻ります。ご自分のバケットの中には、ジョブで作成された出力ファイルとステージング ファイルがあります。

リソースの停止はGoogle Cloud Platform Console で行えます。

Google Cloud Platform Console で Cloud Storage ブラウザを開きます。

ご自分で作成したバケットの隣にあるチェックボックスを選択します。

[削除] をクリックすると、バケットとその内容が永久的に削除されます。

このコードラボでは、Cloud Dataflow SDK を含む Maven プロジェクトを作成する方法、Google Cloud Platform Console を使ってサンプルのパイプラインを実行する方法、そして関連付けられている Cloud Storage バケットおよびその内容を削除する方法を学びました。

知識を深めるには

ライセンス

この資料は、Creative Commons 表示 3.0 ジェネリック・ライセンスおよび Apache License 2.0 の許可を得ています。