Preprocessing BigQuery Data with PySpark on Dataproc

1. 概要

この Codelab では、Apache SparkGoogle Cloud PlatformDataproc を使用してデータ処理パイプラインを作成する方法について説明します。データ サイエンスやデータ エンジニアリングでは、あるストレージの場所からデータを読み取り、そのデータに対して変換を行い、別のストレージの場所に書き込むのが一般的なユースケースです。一般的な変換には、データの内容の変更、不要な情報の削除、ファイル形式の変更などがあります。

この Codelab では、Apache Spark について学び、Dataproc と PySpark(Apache Spark の Python API)、BigQueryGoogle Cloud Storage、Reddit のデータを使用してサンプル パイプラインを実行します。

2. Apache Spark の概要(省略可)

同ウェブサイトによると、「Apache Spark は、大規模なデータ処理のための統合分析エンジンです。"メモリ内で並列にデータを分析して処理できるため、複数の異なるマシンやノード間で大規模な並列計算を行うことができます。当初は従来の MapReduce のアップグレードとして 2014 年にリリースされましたが、今でも大規模なコンピューティングを実行するための最も一般的なフレームワークの一つです。Apache Spark は Scala で記述されており、その後、Scala、Java、Python、R の API が用意されています。データに対して SQL クエリを実行する Spark SQL、ストリーミング データ用の Spark Streaming、ML 用の MLlib、グラフ処理用の GraphX など、多数のライブラリが含まれており、これらはすべて Apache Spark エンジンで実行されます。

32add0b6a47bafbc.png

Spark は単独で実行することも、スケーリングのために YarnMesosKubernetes などのリソース管理サービスを活用することもできます。この Codelab では、Yarn を利用する Dataproc を使用します。

Spark のデータは元々、RDD(復元力の高い分散データセット)と呼ばれるものにメモリに読み込まれていました。Spark での開発には、2 つの新しいカラム型データ型(型付き Dataset と型なし Dataframe)が追加されています。大まかに言うと、RDD はあらゆるタイプのデータに適していますが、データセットと DataFrame は表形式データに最適化されています。データセットは Java API と Scala API でのみ使用できるため、この Codelab では PySpark Dataframe API を使用します。詳細については、Apache Spark のドキュメントをご覧ください。

3. ユースケース

多くの場合、データ エンジニアは、データ サイエンティストがデータに簡単にアクセスできることを必要としています。しかし、データは最初はダーティ(現状のままの分析に使用することは困難)であることが多く、実際に活用するにはクリーニングが必要です。たとえば、ウェブからスクレイピングされたデータには、奇妙なエンコーディングや無関係な HTML タグが含まれている可能性があります。

このラボでは、BigQuery から Dataproc でホストされている Spark クラスタに Reddit の投稿の形式で一連のデータを読み込み、有用な情報を抽出して、処理されたデータを ZIP 形式の CSV ファイルとして Google Cloud Storage に保存します。

be2a4551ece63bfc.png

あなたの会社の最高データ サイエンティストは、各チームが自然言語処理のさまざまな問題に取り組むことに関心を持っています。具体的には、サブレディット「r/food」のデータの分析に関心があります。2017 年 1 月から 2019 年 8 月までのバックフィルから始まるデータダンプのパイプラインを作成します。

4. BigQuery Storage API を介した BigQuery へのアクセス

tabledata.list API メソッドを使用して BigQuery からデータを取得すると、時間がかかり、データ量が増えるため効率的でないことが判明する場合があります。このメソッドは JSON オブジェクトのリストを返します。データセット全体を読み取るには、一度に 1 ページずつ順次読み取る必要があります。

BigQuery Storage API を使用すると、RPC ベースのプロトコルを使用した BigQuery 内のデータへのアクセスが大幅に改善されます。データの読み取りと書き込みを並行して行うだけでなく、Apache AvroApache Arrow などのさまざまなシリアル化形式にも対応しています。大まかに言うと、特に大規模なデータセットでは、パフォーマンスが大幅に向上します。

この Codelab では、spark-bigquery-connector を使用して、BigQuery と Spark の間でデータの読み取りと書き込みを行います。

5. プロジェクトの作成

Google Cloud Platform コンソール(console.cloud.google.com)にログインし、新しいプロジェクトを作成します。

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

次に、Google Cloud リソースを使用するために、Cloud コンソールで課金を有効にする必要があります。

この Codelab を実行するために必要な費用は数ドル以上です。ただし、使用するリソースを増やす場合や、リソースを実行したままにする場合は、コストが高くなる可能性があります。この Codelab の最後のセクションでは、プロジェクトのクリーンアップについて説明します。

Google Cloud Platform の新規ユーザーは、$300 分の無料トライアルをご利用いただけます。

6. 環境の設定

次に、以下の方法で環境を設定します。

  • Compute Engine、Dataproc、BigQuery Storage API の有効化
  • プロジェクト設定の構成
  • Dataproc クラスタの作成
  • Google Cloud Storage バケットを作成する

API を有効にして環境を構成する

Cloud コンソールの右上にあるボタンをクリックして、Cloud Shell を開きます。

a10c47ee6ca41c54.png

Cloud Shell が読み込まれたら、次のコマンドを実行して Compute Engine、Dataproc、BigQuery Storage API を有効にします。

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

プロジェクトのプロジェクト ID を設定します。プロジェクトの選択ページに移動して、該当するプロジェクトを検索します。プロジェクト名は異なる場合があります。

e682e8227aa3c781.png

76d45fb295728542.png

次のコマンドを実行して、プロジェクト ID を設定します。

gcloud config set project <project_id>

こちらのリストから 1 つ選択して、プロジェクトのリージョンを設定します。たとえば、us-central1 のようにします。

gcloud config set dataproc/region <region>

Dataproc クラスタの名前を選択し、その環境変数を作成します。

CLUSTER_NAME=<cluster_name>

Dataproc クラスタの作成

次のコマンドを実行して、Dataproc クラスタを作成します。

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

このコマンドが完了するまでに数分かかります。コマンドを分割するには:

先ほど指定した名前で Dataproc クラスタの作成が開始されます。beta API を使用すると、コンポーネント ゲートウェイなどの Dataproc のベータ版機能が有効になります。

gcloud beta dataproc clusters create ${CLUSTER_NAME}

これにより、ワーカーに使用するマシンのタイプが設定されます。

--worker-machine-type n1-standard-8

これにより、クラスタのワーカー数が設定されます。

--num-workers 8

これにより、Dataproc のイメージ バージョンが設定されます。

--image-version 1.5-debian

これにより、クラスタで使用する初期化アクションが構成されます。ここでは、pip 初期化アクションを含めています。

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

これは、クラスタに含めるメタデータです。ここでは、pip 初期化アクションのメタデータを指定しています。

--metadata 'PIP_PACKAGES=google-cloud-storage'

これにより、クラスタにインストールするオプション コンポーネントが設定されます。

--optional-components=ANACONDA

これによりコンポーネント ゲートウェイが有効になり、Dataproc のコンポーネント ゲートウェイを使用して Zeppelin、Jupyter、Spark 履歴などの一般的な UI を表示できるようになります。

--enable-component-gateway

Dataproc の概要について詳しくは、こちらの Codelab をご覧ください。

Google Cloud Storage バケットを作成する

ジョブ出力には Google Cloud Storage バケットが必要です。バケットの一意の名前を確認し、次のコマンドを実行して新しいバケットを作成します。バケット名はすべてのユーザーのすべての Google Cloud プロジェクトで一意であるため、異なる名前でこれを何度か試すことが必要になる場合があります。ServiceException を受信しない場合は、バケットは正常に作成されています。

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. 探索的データ分析

前処理を実行する前に、処理するデータの性質について詳しく知る必要があります。そのために、2 つのデータ探索方法を見ていきます。まず、BigQuery ウェブ UI を使用して元データを表示し、PySpark と Dataproc を使用してサブレディットごとの投稿数を計算します。

BigQuery ウェブ UI の使用

まず、BigQuery ウェブ UI を使用してデータを表示します。Cloud コンソールのメニュー アイコンから下にスクロールして、[BigQuery] をクリックします。BigQuery ウェブ UI を開きます

242a597d7045b4da.png

次に、BigQuery ウェブ UI のクエリエディタで次のコマンドを実行します。すると、2017 年 1 月のデータの完全な 10 行が返されます。

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

ページをスクロールすると、使用可能なすべての列といくつかの例を確認できます。特に、各投稿のテキスト コンテンツを表す「title」という 2 つの列が表示されます。「selftext」は投稿の本文です「created_utc」などの列もこれは投稿が作成された日時と “subreddit”ですこれは投稿が存在するサブレディットです

PySpark ジョブの実行

Cloud Shell で次のコマンドを実行して、サンプルコードを含むリポジトリのクローンを作成し、正しいディレクトリに移動します。

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

PySpark を使用して、サブレディットごとに投稿数を計算できます。次の手順で実行する前に、Cloud Editor を開いてスクリプト cloud-dataproc/codelabs/spark-bigquery を読みます。

5d965c6fb66dbd81.png

797cf71de3449bdb.png

[ターミナルを開く] をクリックします。] ボタンをクリックして Cloud Shell に戻り、次のコマンドを実行して最初の PySpark ジョブを実行します。

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

このコマンドを使用すると、Jobs API を介してジョブを Dataproc に送信できます。ここでは、ジョブタイプに pyspark を指定しています。クラスタ名、オプションのパラメータ 、ジョブを含むファイルの名前を指定できます。ここでは、spark-bigquery-connector をジョブに含めることができるパラメータ --jars を指定しています。--driver-log-levels root=FATAL を使用してログ出力レベルを設定することもできます。これにより、エラーを除くすべてのログ出力が抑制されます。Spark のログは、ノイズが多くなる傾向があります。

このコマンドの実行には数分かかり、最終的な出力は次のようになります。

6c185228db47bb18.png

8. Dataproc と Spark の UI の探索

Dataproc で Spark ジョブを実行する場合、2 つの UI にアクセスしてジョブ / クラスタのステータスを確認できます。1 つ目は Dataproc UI です。この UI は、メニュー アイコンをクリックして [Dataproc] まで下にスクロールすると見つかります。ここでは、現在使用可能なメモリ、保留中のメモリ、ワーカー数を確認できます。

6f2987346d15c8e2.png

[ジョブ] タブをクリックして、完了したジョブを表示することもできます。特定のジョブのジョブ ID をクリックすると、ジョブのログや出力など、ジョブの詳細を確認できます。114d90129b0e4c88.png

1b2160f0f484594a.png

Spark UI を表示することもできます。ジョブページで戻る矢印をクリックし、[ウェブ インターフェース] をクリックします。コンポーネント ゲートウェイにいくつかのオプションが表示されます。これらの多くは、クラスタの設定時にオプション コンポーネントを使用して有効にできます。このラボでは、[Spark History Server] をクリックします。

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

次のウィンドウが開きます。

8f6786760f994fe8.png

完了したジョブはすべてここに表示されます。application_id をクリックすると、ジョブの詳細を確認できます。同様に [未完了のアプリケーションを表示]を クリックして現在実行中のジョブをすべて表示できます。

9. バックフィル ジョブの実行

次に、メモリにデータを読み込み、必要な情報を抽出し、出力を Google Cloud Storage バケットにダンプするジョブを実行します。抽出したのは "title" と "body" です(未加工テキスト)と「timestamp created」確認できます。次に、このデータを取得して CSV に変換し、zip 形式で圧縮し、gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz という URI のバケットに読み込みます。

再度 Cloud Editor を参照して、cloud-dataproc/codelabs/spark-bigquery/backfill.shコードを読むことができます。これは、cloud-dataproc/codelabs/spark-bigquery/backfill.pyコードを実行するためのラッパー スクリプトです。

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

すぐにジョブ完了メッセージが多数表示されるはずです。ジョブが完了するまでに 15 分ほどかかる場合があります。また、gsutil を使用してストレージ バケットを再確認し、データ出力が成功していることを確認することもできます。すべてのジョブが完了したら、次のコマンドを実行します。

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

次の出力が表示されます。

a7c3c7b2e82f9fca.png

これで、Reddit コメントデータのバックフィルが正常に完了しました。このデータに基づいてモデルを構築する方法に関心がある場合は、Spark-NLP の Codelab に進んでください。

10. クリーンアップ

このクイックスタートの完了後に GCP アカウントに不要な料金が発生しないようにするには:

  1. 作成した環境の Cloud Storage バケットを削除します。
  2. Dataproc 環境を削除します

この Codelab 専用のプロジェクトを作成した場合は、必要に応じてプロジェクトを削除することもできます。

  1. GCP コンソールで [プロジェクト] ページに移動します。
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

ライセンス

このソフトウェアは、クリエイティブ・コモンズの表示 3.0 汎用ライセンス、および Apache 2.0 ライセンスにより使用許諾されています。