Preprocessing BigQuery Data with PySpark on Dataproc

1. 概要

この Codelab では、Google Cloud PlatformDataprocApache Spark を使用してデータ処理パイプラインを作成する方法について説明します。あるストレージの場所からデータを読み込んで変換し、別のストレージの場所に書き込むのが、データ サイエンスとデータ エンジニアリングでの一般的なユースケースです。一般的な変換には、データの内容の変更、不要な情報の除去、ファイル形式の変更などがあります。

この Codelab では、Apache Spark について学び、Dataproc で PySpark(Apache Spark の Python API)、BigQueryGoogle Cloud Storage、Reddit のデータを使用してサンプル パイプラインを実行します。

2. Apache Spark の概要(省略可)

ウェブサイトによると、「Apache Spark は、大規模なデータ処理のための統合分析エンジンです。」データを並列かつインメモリで分析および処理できるため、複数の異なるマシンとノードにまたがる大規模な並列計算が可能になります。2014 年に従来の MapReduce のアップグレードとしてリリースされたこのフレームワークは、現在でも大規模な計算を行うための最も人気のあるフレームワークの 1 つです。Apache Spark は Scala で記述されており、Scala、Java、Python、R の API があります。データに対して SQL クエリを実行する Spark SQL、データをストリーミングする Spark Streaming、ML 用の MLlib、グラフ処理用の GraphX など、さまざまなライブラリが含まれています。これらはすべて Apache Spark エンジンで実行されます。

32add0b6a47bafbc.png

Spark は単独で実行することも、YarnMesosKubernetes などのリソース管理サービスを利用してスケーリングすることもできます。この Codelab では、Yarn を使用する Dataproc を使用します。

Spark のデータは、元々は RDD(耐障害性分散データセット)と呼ばれるメモリに読み込まれていました。Spark の開発では、列形式の新しいデータ型が 2 つ追加されました。型付きの Dataset と型なしの Dataframe です。大まかに言えば、RDD はあらゆる種類のデータに適していますが、Dataset と Dataframe は表形式データ用に最適化されています。Datasets は Java と Scala の API でのみ使用できるため、この Codelab では PySpark Dataframe API を使用します。詳細については、Apache Spark のドキュメントをご覧ください。

3. ユースケース

データ エンジニアは、データ サイエンティストが簡単にアクセスできるデータを必要とすることがよくあります。ただし、データは最初は汚れている(現状のままでは分析に使用しにくい)ことが多く、有用に活用するにはクリーンアップする必要があります。たとえば、ウェブからスクレイピングされたデータには、奇妙なエンコードや余分な HTML タグが含まれている可能性があります。

このラボでは、BigQuery から Reddit 投稿の形式でデータセットを Dataproc でホストされている Spark クラスタに読み込み、有用な情報を抽出して、処理されたデータを圧縮 CSV ファイルとして Google Cloud Storage に保存します。

be2a4551ece63bfc.png

貴社のチーフ データ サイエンティストは、チームがさまざまな自然言語処理の問題に取り組むことに関心を持っています。特に、サブレディット「r/food」のデータの分析に関心があります。2017 年 1 月から 2019 年 8 月までのバックフィルから始まるデータダンプ用のパイプラインを作成します。

4. BigQuery Storage API を介して BigQuery にアクセスする

tabledata.list API メソッドを使用して BigQuery からデータを取得すると、データ量が増加するにつれて時間がかかり、効率が低下する可能性があります。このメソッドは JSON オブジェクトのリストを返します。データセット全体を読み取るには、一度に 1 ページずつ順番に読み取る必要があります。

BigQuery Storage API は、RPC ベースのプロトコルを使用して BigQuery 内のデータへのアクセスを大幅に改善します。データの読み取りと書き込みを並行して行うことができ、Apache AvroApache Arrow などのさまざまなシリアル化形式をサポートしています。大まかに言えば、特に大規模なデータセットでパフォーマンスが大幅に向上します。

この Codelab では、spark-bigquery-connector を使用して、BigQuery と Spark の間でデータを読み書きします。

5. プロジェクトの作成

console.cloud.google.com で Google Cloud Platform コンソールにログインし、新しいプロジェクトを作成します。

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

次に、Google Cloud リソースを使用するために、Cloud コンソールで課金を有効にする必要があります。

この Codelab の操作をすべて行って、費用が生じたとしても、少額です。この Codelab の最後のセクションでは、プロジェクトのクリーンアップについて説明します。

Google Cloud Platform の新規ユーザーは、300 ドル分の無料トライアルをご利用いただけます。

6. 環境設定

次に、次の手順で環境を設定します。

  • Compute Engine、Dataproc、BigQuery Storage API を有効にする
  • プロジェクトの設定
  • Dataproc クラスタの作成
  • Google Cloud Storage バケットを作成する

API の有効化と環境の構成

Cloud コンソールの右上にあるボタンをクリックして Cloud Shell を開きます。

a10c47ee6ca41c54.png

Cloud Shell が読み込まれたら、次のコマンドを実行して Compute Engine、Dataproc、BigQuery Storage API を有効にします。

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

プロジェクトのプロジェクト ID を設定します。プロジェクト セレクタのページに移動してプロジェクトを検索すると確認できます。これはプロジェクト名と同じでなくてもかまいません。

e682e8227aa3c781.png

76d45fb295728542.png

次のコマンドを実行してプロジェクト ID を設定します。

gcloud config set project <project_id>

こちらのリストからプロジェクトのリージョンを選択します。たとえば、us-central1 です。

gcloud config set dataproc/region <region>

Dataproc クラスタの名前を選択して、その環境変数を作成します。

CLUSTER_NAME=<cluster_name>

Dataproc クラスタの作成

次のコマンドを実行して Dataproc クラスタを作成します。

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

このコマンドが完了するまでに数分かかります。コマンドを分解すると次のようになります。

これにより、先ほど指定した名前の Dataproc クラスタの作成が開始されます。beta API を使用すると、コンポーネント ゲートウェイなどの Dataproc のベータ版機能を有効にできます。

gcloud beta dataproc clusters create ${CLUSTER_NAME}

これにより、ワーカーに使用するマシンのタイプが設定されます。

--worker-machine-type n1-standard-8

これにより、クラスタのワーカー数が設定されます。

--num-workers 8

これにより、Dataproc のイメージ バージョンが設定されます。

--image-version 1.5-debian

これにより、クラスタで使用される初期化アクションが構成されます。ここでは、pip 初期化アクションを含めています。

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

これは、クラスタに含めるメタデータです。ここでは、pip 初期化アクションのメタデータを指定しています。

--metadata 'PIP_PACKAGES=google-cloud-storage'

これにより、クラスタにインストールされるオプション コンポーネントが設定されます。

--optional-components=ANACONDA

これにより、コンポーネント ゲートウェイが有効になり、Dataproc のコンポーネント ゲートウェイを使用して、Zeppelin、Jupyter、Spark History などの一般的な UI を表示できます。

--enable-component-gateway

Dataproc の詳細については、こちらの codelab をご覧ください。

Google Cloud Storage バケットを作成する

ジョブの出力には Google Cloud Storage バケットが必要です。バケットの一意の名前を決定し、次のコマンドを実行して新しいバケットを作成します。バケット名は、すべてのユーザーのすべての Google Cloud プロジェクトで一意であるため、名前を変えて数回試す必要がある場合があります。ServiceException が返されなかった場合、バケットは正常に作成されています。

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. 探索的データ分析

前処理を行う前に、扱うデータの性質について詳しく学ぶ必要があります。そのために、データ探索の 2 つの方法について説明します。まず、BigQuery ウェブ UI を使用して一部の元データを表示し、次に PySpark と Dataproc を使用してサブディビジョンあたりの投稿数を計算します。

BigQuery ウェブ UI の使用

まず、BigQuery ウェブ UI を使用してデータを表示します。Cloud コンソールのメニュー アイコンから下にスクロールし、[BigQuery] をクリックして BigQuery ウェブ UI を開きます。

242a597d7045b4da.png

次に、BigQuery ウェブ UI のクエリエディタで次のコマンドを実行します。これにより、2017 年 1 月のデータの完全な行が 10 行返されます。

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

ページをスクロールすると、使用可能なすべての列と例を確認できます。特に、各投稿のテキスト コンテンツを表す 2 つの列(「title」と「selftext」)が表示されます。後者は投稿の本文です。また、投稿が作成された UTC 時刻を表す「created_utc」や、投稿が存在するサブ reddit を表す「subreddit」などの他の列にも注目してください。

PySpark ジョブの実行

Cloud Shell で次のコマンドを実行して、サンプルコードを含むリポジトリのクローンを作成し、正しいディレクトリに移動します。

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

PySpark を使用して、各サブ reddit に存在する投稿の数を特定できます。Cloud エディタを開いてスクリプト cloud-dataproc/codelabs/spark-bigquery を読み取り、次のステップで実行できます。

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Cloud エディタの [ターミナルを開く] ボタンをクリックして Cloud Shell に戻り、次のコマンドを実行して最初の PySpark ジョブを実行します。

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

このコマンドを使用すると、Jobs API を介して Dataproc にジョブを送信できます。ここでは、ジョブタイプを pyspark と指定しています。クラスタ名、オプションのパラメータ 、ジョブを含むファイルの名前を指定できます。ここでは、spark-bigquery-connector をジョブに含めることができるパラメータ --jars を指定しています。--driver-log-levels root=FATAL を使用してログ出力レベルを設定することもできます。これにより、エラーを除くすべてのログ出力が抑制されます。Spark のログはノイズが多い傾向があります。

実行には数分かかる予定です。最終的な出力は次のようになります。

6c185228db47bb18.png

8. Dataproc と Spark の UI の確認

Dataproc で Spark ジョブを実行する場合は、2 つの UI を使用してジョブやクラスタのステータスを確認できます。1 つ目は Dataproc UI です。メニュー アイコンをクリックして、Dataproc まで下にスクロールすると表示されます。ここでは、使用可能な現在のメモリ、保留中のメモリ、ワーカーの数を確認できます。

6f2987346d15c8e2.png

[ジョブ] タブをクリックして、完了したジョブを確認することもできます。特定のジョブのジョブ ID をクリックすると、そのジョブのログや出力などのジョブの詳細を確認できます。114d90129b0e4c88.png

1b2160f0f484594a.png

Spark UI を表示することもできます。ジョブページで、戻る矢印をクリックし、[ウェブ インターフェース] をクリックします。[コンポーネント ゲートウェイ] にいくつかのオプションが表示されます。これらの多くは、クラスタの設定時にオプション コンポーネントで有効にできます。このラボでは、[Spark History Server] をクリックします。

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

次のウィンドウが開きます。

8f6786760f994fe8.png

完了したジョブはすべてここに表示されます。任意の application_id をクリックすると、ジョブの詳細を確認できます。同様に、ランディング ページの一番下にある [未完了の申請を表示] をクリックすると、現在実行中のすべてのジョブが表示されます。

9. バックフィル ジョブの実行

次に、データをメモリに読み込み、必要な情報を抽出して、出力を Google Cloud Storage バケットにダンプするジョブを実行します。Reddit のコメントごとに「title」、「body」(未加工テキスト)、および「timestamp created」を抽出します。次に、このデータを CSV に変換し、圧縮して、URI が gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz のバケットに読み込みます。

Cloud エディタに戻って、cloud-dataproc/codelabs/spark-bigquery/backfill.pyコードを実行するラッパー スクリプトである cloud-dataproc/codelabs/spark-bigquery/backfill.shコードを確認します。

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

しばらくすると、ジョブの完了メッセージが多数表示されます。ジョブが完了するまでに最大 15 分かかる場合があります。gsutil を使用してストレージ バケットを再確認し、データ出力が正常に行われたことを確認することもできます。すべてのジョブが完了したら、次のコマンドを実行します。

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

次の出力が表示されます。

a7c3c7b2e82f9fca.png

Reddit コメントデータのバックアップが正常に完了しました。このデータの上にモデルを構築する方法については、Spark-NLP Codelab に進んでください。

10. クリーンアップ

このクイックスタートの完了後に GCP アカウントに不要な請求が発生しないようにするには:

  1. 環境用に作成した Cloud Storage バケットを削除します
  2. Dataproc 環境を削除します

この Codelab 専用のプロジェクトを作成した場合は、必要に応じてそのプロジェクトを削除することもできます。

  1. GCP Console で [プロジェクト] ページに移動します。
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

ライセンス

このコンテンツは、クリエイティブ コモンズの表示 3.0 汎用ライセンスと Apache 2.0 ライセンスにより使用許諾されています。