Dataproc クラスタでの Hadoop ワードカウント ジョブの実行

1. はじめに

ワークフローはデータ分析における一般的なユースケースのひとつで、データの取り込み、変換、分析によってデータ内で有益な情報を見つけるために使用されます。Google Cloud Platform には、ワークフローをオーケストレートするためのツールとして Cloud Composer が用意されています。これは、よく利用されているオープンソース ワークフロー ツールの Apache Airflow をホスト型にしたものです。このラボでは、Cloud Composer を使用して、Cloud Dataproc クラスタを作成し、Cloud Dataproc と Apache Hadoop を使用して分析し、その後 Cloud Dataproc クラスタを削除するシンプルなワークフローを作成します。

Cloud Composer とは

Cloud Composer は、フルマネージドなワークフロー オーケストレーション サービスです。クラウドとオンプレミス データセンターにまたがるパイプラインの作成、スケジューリング、モニタリングを実現します。よく利用されている Apache Airflow オープンソース プロジェクトを基に構築され、Python プログラミング言語を使用して運用されています。Cloud Composer は、特定のベンダーに依存することがなく、使いやすいサービスです。

Apache Airflow のローカル インスタンスではなく Cloud Composer を使用すると、ユーザーはインストールや管理のオーバーヘッドなしに、Airflow のメリットを活用できます。

Apache Airflow とは

Apache Airflow は、ワークフローをプログラムで作成、スケジューリング、モニタリングするために使用されるオープンソース ツールです。ラボ全体で使用する Airflow に関連する重要な用語をいくつか紹介します。

  • DAG - a DAG(有向非巡回グラフ)とは、スケジュールを設定して実行する体系的なタスクの集まりです。DAG(ワークフローとも呼ばれます)は、標準の Python ファイルで定義されます。
  • 演算子 - 演算子は、ワークフロー内の 1 つのタスクを表します。

Cloud Dataproc とは何ですか?

Cloud Dataproc は、Google Cloud Platform のフルマネージド Apache SparkApache Hadoop サービスです。Cloud Dataproc は他の GCP サービスと簡単に統合できるため、データ処理、分析、機械学習のための強力で包括的なプラットフォームとして使用できます。

演習内容

この Codelab では、次のタスクを行う Apache Airflow ワークフローを Cloud Composer で作成して実行する方法について説明します。

  • Cloud Dataproc クラスタを作成する
  • クラスタで Apache Hadoop ワードカウント ジョブを実行し、結果を Cloud Storage に出力する
  • クラスタを削除する

学習内容

  • Cloud Composer で Apache Airflow ワークフローを作成して実行する方法
  • Cloud Composer と Cloud Dataproc を使用してデータセットの分析を実行する方法
  • Google Cloud Platform Console、Cloud SDK、Airflow ウェブ インターフェースを使用して、Cloud Composer 環境にアクセスする方法

必要なもの

  • GCP アカウント
  • 基本的な CLI の知識
  • Python の基本的な理解

2. GCP の設定

プロジェクトを作成する

Google Cloud Platform プロジェクトを選択または作成します。

後のステップで使用するため、プロジェクト ID をメモしておきます。

新しいプロジェクトを作成する場合は、作成ページの [プロジェクト名] のすぐ下にプロジェクト ID が表示されます。

プロジェクトをすでに作成している場合は、[コンソール ホームページ] の [プロジェクト情報] カードで ID を確認できます。

API を有効にする

Cloud Composer、Cloud Dataproc、Cloud Storage の各 API を有効にします。有効にしたら、[認証情報に移動] ボタンは無視して、チュートリアルの次のステップに進んでください。

Composer 環境を作成する

次の構成で Cloud Composer 環境を作成します

  • 名前: my-composer-environment
  • ロケーション: us-central1
  • ゾーン: us-central1-a

他のすべての構成はデフォルトのままで問題ありません。下部にある [作成] をクリックします。

Cloud Storage バケットを作成する

プロジェクトで、Cloud Storage バケットを次の構成で作成します。

  • 名前: <your-project-id>
  • デフォルトのストレージ クラス: Multi-regional
  • 所在地: 米国
  • アクセス制御モデル: きめ細かい

準備ができたら [作成] を押します。

3. Apache Airflow を設定する

Composer 環境情報を表示する

GCP Console で [環境] ページを開きます。

環境の名前をクリックして、詳細を表示します。

[環境の詳細] ページには、Airflow ウェブ インターフェースの URL、Google Kubernetes Engine のクラスタ ID、Cloud Storage バケットの名前、/dags フォルダのパスなどの情報が表示されます。

Airflow では、DAG(有向非巡回グラフ)とは、スケジュールを設定して実行する体系的なタスクの集まりです。DAG(ワークフローとも呼ばれます)は、標準の Python ファイルで定義されます。Cloud Composer がスケジュールを設定するのは、/dags フォルダ内の DAG のみです。/dags フォルダは、環境の作成時に Cloud Composer が自動的に作成する Cloud Storage バケット内にあります。

Apache Airflow 環境変数を設定する

Apache Airflow 変数は Airflow 固有の概念であり、 環境変数とは異なります。このステップでは、次の 3 つの Airflow 変数を設定します: gcp_projectgcs_bucketgce_zone

gcloud を使用して変数を設定する

まず、Cloud Shell を開きます。Cloud SDK がインストールされています。

環境変数 COMPOSER_INSTANCE を Composer 環境の名前に設定します。

COMPOSER_INSTANCE=my-composer-environment

gcloud コマンドライン ツールを使用して Airflow 変数を設定するには、variables サブコマンドを指定して gcloud composer environments run コマンドを使用します。この gcloud composer コマンドは、Airflow CLI サブコマンド variables を実行します。このサブコマンドは gcloud コマンドライン ツールに引数を渡します。

このコマンドを 3 回実行し、変数をプロジェクトに関連する変数に置き換えます。

次のコマンドを使用して gcp_project を設定します。このとき、<your-project-id> はステップ 2 でメモしたプロジェクト ID に置き換えます。

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

出力は次のようになります。

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

次のコマンドを使用して gcs_bucket を設定します。このとき、<your-bucket-name> はステップ 2 でメモしたバケット ID に置き換えます。推奨事項に従った場合、バケット名はプロジェクト ID と同じになります。出力は前のコマンドと同様になります。

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

次のコマンドを使用して gce_zone を設定します。出力は前のコマンドと同様になります。

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(省略可)gcloud を使用して変数を表示する

変数の値を表示するには、Airflow CLI サブコマンド variablesget 引数を指定して実行するか、Airflow UI を使用します。

次に例を示します。

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

これは、設定した 3 つの変数(gcp_projectgcs_bucketgce_zone)のいずれでも実行できます。

4. サンプル ワークフロー

ステップ 5 で使用する DAG のコードを見てみましょう。まだファイルをダウンロードする必要はありません。このまま進んでください。

ここで多くが解凍されます。少し分解してみましょう。

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

まず、Airflow のインポートから始めます。

  • airflow.models - Airflow データベース内のデータにアクセスして作成できます。
  • airflow.contrib.operators - コミュニティの演算子が存在する場所。この場合、Cloud Dataproc API にアクセスするには dataproc_operator が必要です。
  • airflow.utils.trigger_rule - 演算子にトリガールールを追加します。トリガールールを使用すると、親のステータスに応じて演算子を実行するかどうかをきめ細かく制御できます。
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

出力ファイルの場所を指定します。ここで注目すべき行は models.Variable.get('gcs_bucket') です。これにより、Airflow データベースから gcs_bucket 変数値が取得されます。

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR - Cloud Dataproc クラスタで最終的に実行する .jar ファイルの場所。GCP でホストされています。
  • input_file - Hadoop ジョブが最終的に計算するデータを含むファイルの場所。ステップ 5 で、データをその場所にアップロードします。
  • wordcount_args - jar ファイルに渡す引数。
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

これにより、前日の午前 0 時を表す datetime オブジェクトが生成されます。たとえば、3 月 4 日の午前 11 時に実行した場合、datetime オブジェクトは 3 月 3 日の午前 0 時を表します。これは、Airflow がスケジューリングを処理する方法に関連しています。詳細については、こちらをご覧ください。

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

新しい DAG を作成するときは、辞書形式の default_dag_args 変数を指定する必要があります。

  • 'email_on_failure' - タスクが失敗したときにメールアラートを送信するかどうかを示します。
  • 'email_on_retry' - タスクが再試行されたときにメールアラートを送信するかどうかを示します。
  • 'retries' - DAG が失敗した場合に Airflow が試行する再試行回数を示します。
  • 'retry_delay' - Airflow が再試行を試みるまでの待機時間を示します。
  • 'project_id' - DAG に関連付ける GCP プロジェクト ID を指定します。これは、後で Dataproc 演算子で使用します。
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

with models.DAG を使用すると、その下のすべてのものが同じ DAG に含まれます。また、3 つの引数が渡されます。

  • 1 つ目の文字列は、作成する DAG に付ける名前です。この例では、composer_hadoop_tutorial を使用します。
  • schedule_interval - datetime.timedelta オブジェクト。ここでは 1 日に設定しています。つまり、この DAG は、'default_dag_args' で設定した 'start_date' の後に 1 日 1 回実行を試みます。
  • default_args - DAG のデフォルト引数を含む、先ほど作成した辞書。

Dataproc クラスタを作成する

次に、dataproc_operator.DataprocClusterCreateOperator を作成します。これにより、Cloud Dataproc クラスタが作成されます。

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

この演算子内にはいくつかの引数があります。最初の引数以外は、この演算子に固有のものです。

  • task_id - BashOperator と同様に、これは演算子に割り当てる名前で、Airflow UI から確認できます。
  • cluster_name - Cloud Dataproc クラスタに割り当てる名前。ここでは、 composer-hadoop-tutorial-cluster-{{ ds_nodash }} という名前を付けました(詳細については、情報ボックスをご覧ください)。
  • num_workers - Cloud Dataproc クラスタに割り当てるワーカーの数。
  • zone - クラスタを配置する地理的リージョン。Airflow データベースに保存されます。ステップ 3 で設定した 'gce_zone' 変数を読み取ります。
  • master_machine_type - Cloud Dataproc マスターに割り当てるマシンのタイプ。
  • worker_machine_type - Cloud Dataproc ワーカーに割り当てるマシンのタイプ。

Apache Hadoop ジョブを送信する

dataproc_operator.DataProcHadoopOperator を使用すると、Cloud Dataproc クラスタにジョブを送信できます。

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

いくつかのパラメータを指定します。

  • task_id - この DAG の部分に割り当てる名前。
  • main_jar - クラスタに対して実行する .jar ファイルの場所。
  • cluster_name - ジョブを実行するクラスタの名前。前の演算子と同じです。
  • arguments - コマンドラインから .jar ファイルを実行する場合と同様に、jar ファイルに渡される引数。

クラスタを削除する

最後に作成する演算子は dataproc_operator.DataprocClusterDeleteOperator です。

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

名前が示すように、この演算子は指定された Cloud Dataproc クラスタを削除します。3 つの引数があります。

  • task_id - BashOperator と同様に、これは演算子に割り当てる名前で、Airflow UI から確認できます。
  • cluster_name - Cloud Dataproc クラスタに割り当てる名前。ここでは、 composer-hadoop-tutorial-cluster-{{ ds_nodash }} という名前を付けました(詳細については、「Dataproc クラスタを作成する」の後の情報ボックスをご覧ください)。
  • trigger_rule - このステップの冒頭のインポートでトリガールールについて簡単に説明しましたが、ここでは実際に使用します。デフォルトでは、アップストリームの演算子がすべて正常に完了しない限り、Airflow 演算子は実行されません。ALL_DONE トリガールールでは、アップストリームの演算子がすべて完了している必要があります。成功したかどうかは関係ありません。つまり、Hadoop ジョブが失敗した場合でも、クラスタを削除します。
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

最後に、これらの演算子を特定の順序で実行します。これは、Python ビットシフト演算子を使用して指定できます。この場合、create_dataproc_cluster が常に最初に実行され、次に run_dataproc_hadoop、最後に delete_dataproc_cluster が実行されます。

コードをまとめると、次のようになります。

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. Airflow ファイルを Cloud Storage にアップロードする

DAG を /dags フォルダにコピーする

  1. まず、Cloud Shell を開きます。Cloud SDK がインストールされています。
  2. python サンプル リポジトリのクローンを作成し、composer/workflows ディレクトリに移動します。
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. 次のコマンドを実行して、DAG フォルダの名前を環境変数に設定します。
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. 次の gsutil コマンドを実行して、チュートリアル コードを /dags フォルダが作成された場所にコピーします。
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

出力は次のようになります。

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. Airflow UI を使用する

GCP コンソールを使用して Airflow ウェブ インターフェースにアクセスするには、次の手順を行います。

  1. [**環境**] ページを開きます。
  2. 環境の [Airflow ウェブサーバー] 列で、新しいウィンドウ アイコンをクリックします。新しいウィンドウに Airflow ウェブ UI が表示されます。

Airflow UI について詳しくは、 ウェブ インターフェースへのアクセスをご覧ください。

変数を表示する

先ほど設定した変数は、環境内に保持されています。Airflow UI のメニューバーで [管理] > [変数] を選択すると、変数を確認できます。

[List] タブが選択され、次のキーと値のテーブルが表示されています。キー: gcp_project、値: project-id キー: gcs_bucket、値: gs://bucket-name キー: gce_zone、値: zone

DAG の実行状況を確認する

DAG ファイルを Cloud Storage の dags フォルダにアップロードすると、ファイルが Cloud Composer によって解析されます。エラーが見つからなければ、ワークフローの名前が DAG のリストに表示され、ワークフローは即時実行されるようキューに登録されます。DAG を確認するには、ページ上部の [DAG] をクリックします。

84a29c71f20bff98.png

composer_hadoop_tutorial をクリックして、DAG の詳細ページを開きます。このページでは、ワークフローのタスクと依存関係が図で示されます。

f4f1663c7a37f47c.png

ツールバーで [グラフビュー] をクリックし、各タスクに該当するグラフィックにカーソルを合わせてステータスを表示します。各タスクを囲む線の色もステータスを表しています(緑は実行中、赤は失敗など)。

4c5a0c6fa9f88513.png

[**グラフビュー**] からワークフローをもう一度実行するには、次の手順を行います。

  1. Airflow UI のグラフビューで、create_dataproc_cluster グラフィックをクリックします。
  2. [Clear] をクリックして 3 つのタスクをリセットし、[OK] をクリックして確定します。

fd1b23b462748f47.png

次の GCP Console ページに移動して、composer-hadoop-tutorial ワークフローのステータスと結果を確認することもできます。

  • Cloud Dataproc クラスタ。クラスタの作成と削除をモニタリングします。ワークフローによって作成されるクラスタは一時的なものであり、ワークフローの間にのみ存在し、最後のワークフロー タスクの一部として削除されます。
  • Cloud Dataproc ジョブ。Apache Hadoop のワードカウント ジョブを表示またはモニタリングします。ジョブ ID をクリックすると、ジョブのログ出力を確認できます。
  • Cloud Storage ブラウザ。この Codelab 用に作成した Cloud Storage バケット内の wordcount フォルダのワード数の結果を表示します。

7. クリーンアップ

この Codelab で使用したリソースについて GCP アカウントに課金されないようにする手順は次のとおりです。

  1. (省略可)データを保存するには、データをダウンロードして、Cloud Composer 環境の Cloud Storage バケットと、この Codelab 用に作成した Storage バケットから取得します。
  2. この Codelab 用に作成した Cloud Storage バケットを削除します
  3. 環境の Cloud Storage バケットを削除します
  4. Cloud Composer 環境を削除します。環境を削除しても、環境の Storage バケットは削除されません。

必要に応じて、プロジェクトを削除することもできます。

  1. GCP Console で [Projects] ページに移動します。
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。