Preprocessing BigQuery Data with PySpark on Dataproc

1. 概要

この Codelab では、Google Cloud PlatformDataprocApache Spark を使用してデータ処理パイプラインを作成する方法について説明します。あるストレージの場所からデータを読み込んで変換し、別のストレージの場所に書き込むのが、データ サイエンスとデータ エンジニアリングでの一般的なユースケースです。一般的な変換には、データの内容の変更、不要な情報の削除、ファイル形式の変更などがあります。

この Codelab では、Apache Spark について学習し、Dataproc と PySpark(Apache Spark の Python API)、BigQueryGoogle Cloud Storage、Reddit のデータを使用してサンプル パイプラインを実行します。

2. Apache Spark の概要(省略可)

ウェブサイトによると、「Apache Spark は、大規模なデータ処理のための統合分析エンジンです。」これにより、データを並列でメモリ内で分析および処理できるため、複数の異なるマシンとノードで大規模な並列計算が可能になります。もともとは 2014 年に従来の MapReduce のアップグレードとしてリリースされましたが、現在でも大規模な計算を実行するための最も人気のあるフレームワークの 1 つです。Apache Spark は Scala で記述されており、Scala、Java、Python、R の API があります。これには、データに対して SQL クエリを実行するための Spark SQL、データをストリーミングするための Spark Streaming、ML 用の MLlib、グラフ処理用の GraphX など、多数のライブラリが含まれています。これらはすべて Apache Spark エンジンで実行されます。

32add0b6a47bafbc.png

Spark は単独で実行することも、スケーリングに YarnMesosKubernetes などのリソース管理サービスを利用することもできます。この Codelab では、Yarn を利用する Dataproc を使用します。

Spark のデータは、元々 RDD(耐障害性分散データセット)と呼ばれるメモリに読み込まれていました。Spark の開発では、型付きの Dataset と型なしの Dataframe という 2 つの新しい列形式のデータ型が追加されました。大まかに言うと、RDD はあらゆるタイプのデータに適していますが、Dataset と DataFrame は表形式のデータに最適化されています。データセットは Java API と Scala API でのみ使用できるため、この Codelab では PySpark Dataframe API を使用します。詳細については、Apache Spark のドキュメントをご覧ください。

3. ユースケース

データ エンジニアは、データ サイエンティストがデータに簡単にアクセスできるようにする必要があることがよくあります。ただし、データは多くの場合、最初は汚れており(現在の状態では分析に使用するのが難しい)、有用なものにするにはクリーニングが必要です。たとえば、ウェブからスクレイピングされたデータには、奇妙なエンコードや不要な HTML タグが含まれていることがあります。

このラボでは、BigQuery から Reddit 投稿の形式で Dataproc でホストされている Spark クラスタにデータセットを読み込み、有用な情報を抽出して、処理されたデータを圧縮された CSV ファイルとして Google Cloud Storage に保存します。

be2a4551ece63bfc.png

会社のチーフ データ サイエンティストは、チームにさまざまな自然言語処理の問題に取り組んでほしいと考えています。具体的には、subreddit「r/food」のデータを分析することに関心があります。2017 年 1 月から 2019 年 8 月までのバックフィルから始まるデータダンプのパイプラインを作成します。

4. BigQuery Storage API を介して BigQuery にアクセスする

tabledata.list API メソッドを使用して BigQuery からデータを取得すると、データ量の増加に伴い、時間がかかり、効率が悪くなる可能性があります。このメソッドは JSON オブジェクトのリストを返します。データセット全体を読み取るには、一度に 1 ページずつ順番に読み取る必要があります。

BigQuery Storage API は、RPC ベースのプロトコルを使用して BigQuery のデータにアクセスする際のパフォーマンスを大幅に向上させます。並列でのデータの読み取りと書き込み、Apache AvroApache Arrow などのさまざまなシリアル化形式をサポートしています。大まかに言うと、これによりパフォーマンスが大幅に向上します。特に大規模なデータセットで効果があります。

この Codelab では、BigQuery と Spark の間でデータの読み取りと書き込みを行うために spark-bigquery-connector を使用します。

5. プロジェクトの作成

console.cloud.google.com で Google Cloud Platform コンソールにログインし、新しいプロジェクトを作成します。

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

次に、Google Cloud リソースを使用するために、Cloud Console で課金を有効にする必要があります。

この Codelab の操作をすべて行って、費用が生じたとしても、少額です。ただし、リソースの使用量を増やしたり、実行したままにしたりすると、費用が増加する可能性があります。この Codelab の最後のセクションでは、プロジェクトのクリーンアップについて説明します。

Google Cloud Platform の新規ユーザーは、$300 の無料トライアルをご利用いただけます。

6. 環境をセットアップする

環境を設定する手順は次のとおりです。

  • Compute Engine、Dataproc、BigQuery Storage API を有効にする
  • プロジェクト設定を構成する
  • Dataproc クラスタの作成
  • Google Cloud Storage バケットを作成する

API の有効化と環境の構成

Cloud Console の右上にあるボタンを押して、Cloud Shell を開きます。

a10c47ee6ca41c54.png

Cloud Shell が読み込まれたら、次のコマンドを実行して Compute Engine、Dataproc、BigQuery Storage API を有効にします。

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

プロジェクトのプロジェクト ID を設定します。プロジェクト セレクタのページに移動してプロジェクトを検索すると、確認できます。これはプロジェクト名と同じでなくても構いません。

e682e8227aa3c781.png

76d45fb295728542.png

次のコマンドを実行して、プロジェクト ID を設定します。

gcloud config set project <project_id>

こちらのリストから 1 つを選択して、プロジェクトのリージョンを設定します。たとえば、us-central1 です。

gcloud config set dataproc/region <region>

Dataproc クラスタの名前を選択し、その環境変数を作成します。

CLUSTER_NAME=<cluster_name>

Dataproc クラスタの作成

次のコマンドを実行して、Dataproc クラスタを作成します。

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

このコマンドが完了するまでに数分かかります。コマンドの内訳は次のとおりです。

これにより、先ほど指定した名前で Dataproc クラスタの作成が開始されます。beta API を使用すると、コンポーネント ゲートウェイなどの Dataproc のベータ版機能が有効になります。

gcloud beta dataproc clusters create ${CLUSTER_NAME}

これにより、ワーカーに使用するマシンのタイプが設定されます。

--worker-machine-type n1-standard-8

これにより、クラスタのワーカー数が設定されます。

--num-workers 8

これにより、Dataproc のイメージ バージョンが設定されます。

--image-version 1.5-debian

これにより、クラスタで使用される初期化アクションが構成されます。ここでは、pip 初期化アクションを含めています。

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

これは、クラスタに含めるメタデータです。ここでは、pip 初期化アクションのメタデータを提供しています。

--metadata 'PIP_PACKAGES=google-cloud-storage'

これにより、クラスタにインストールされるオプション コンポーネントが設定されます。

--optional-components=ANACONDA

これにより、コンポーネント ゲートウェイが有効になり、Dataproc のコンポーネント ゲートウェイを使用して、Zeppelin、Jupyter、Spark History などの一般的な UI を表示できます。

--enable-component-gateway

Dataproc の詳細については、こちらの Codelab をご覧ください。

Google Cloud Storage バケットを作成する

ジョブの出力用に Google Cloud Storage バケットが必要です。バケットの一意の名前を決定し、次のコマンドを実行して新しいバケットを作成します。バケット名は、すべてのユーザーのすべての Google Cloud プロジェクトで一意であるため、異なる名前で数回試す必要がある場合があります。ServiceException が返されなければ、バケットは正常に作成されています。

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. 探索的データ分析

前処理を行う前に、扱うデータの性質について詳しく知っておく必要があります。このために、2 つのデータ探索方法について説明します。まず、BigQuery ウェブ UI を使用して未加工のデータを表示し、次に PySpark と Dataproc を使用してサブレディットあたりの投稿数を計算します。

BigQuery ウェブ UI の使用

まず、BigQuery ウェブ UI を使用してデータを表示します。Cloud Console のメニュー アイコンから下にスクロールして [BigQuery] を押し、BigQuery ウェブ UI を開きます。

242a597d7045b4da.png

次に、BigQuery ウェブ UI のクエリエディタで次のコマンドを実行します。これにより、2017 年 1 月のデータの完全な行が 10 行返されます。

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

ページを横にスクロールすると、使用可能なすべての列といくつかの例が表示されます。特に、各投稿のテキスト コンテンツを表す 2 つの列(「title」と「selftext」)が表示されます。後者は投稿の本文です。また、投稿が作成された UTC 時刻を示す「created_utc」や、投稿が存在するサブレディットを示す「subreddit」などの他の列にも注目してください。

PySpark ジョブの実行

Cloud Shell で次のコマンドを実行して、サンプルコードを含むリポジトリのクローンを作成し、正しいディレクトリに移動します。

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

PySpark を使用して、各サブレディットに存在する投稿の数を特定できます。次のステップで実行する前に、Cloud エディタを開いて スクリプト cloud-dataproc/codelabs/spark-bigquery を読み取ることができます。

5d965c6fb66dbd81.png

797cf71de3449bdb.png

Cloud エディタの [ターミナルを開く] ボタンをクリックして Cloud Shell に戻り、次のコマンドを実行して最初の PySpark ジョブを実行します。

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

このコマンドを使用すると、Jobs API を介して Dataproc にジョブを送信できます。ここでは、ジョブタイプを pyspark として示しています。クラスタ名、オプションのパラメータ 、ジョブを含むファイルの名前を指定できます。ここでは、ジョブに spark-bigquery-connector を含めることができる --jars パラメータを指定しています。--driver-log-levels root=FATAL を使用してログ出力レベルを設定することもできます。これにより、エラー以外のすべてのログ出力が抑制されます。Spark ログはノイズが多くなりがちです。

このコマンドは実行に数分かかります。最終的な出力は次のようになります。

6c185228db47bb18.png

8. Dataproc と Spark の UI を調べる

Dataproc で Spark ジョブを実行する場合、ジョブ / クラスタのステータスを確認するための 2 つの UI にアクセスできます。1 つ目は Dataproc UI です。メニュー アイコンをクリックして Dataproc までスクロールすると表示されます。ここでは、現在の使用可能なメモリ、保留中のメモリ、ワーカーの数を確認できます。

6f2987346d15c8e2.png

[ジョブ] タブをクリックして、完了したジョブを表示することもできます。特定のジョブのジョブ ID をクリックすると、そのジョブのログや出力などのジョブの詳細を確認できます。114d90129b0e4c88.png

1b2160f0f484594a.png

Spark UI を表示することもできます。ジョブページで、戻る矢印をクリックし、[ウェブ インターフェース] をクリックします。コンポーネント ゲートウェイの下に複数のオプションが表示されます。これらの多くは、クラスタの設定時にオプション コンポーネントを介して有効にできます。このラボでは、[Spark History Server] をクリックします。

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

次のウィンドウが開きます。

8f6786760f994fe8.png

完了したジョブはすべてここに表示されます。application_id をクリックすると、ジョブの詳細を確認できます。同様に、ランディング ページの一番下にある [Show Incomplete Applications] をクリックすると、現在実行中のすべてのジョブを表示できます。

9. バックフィル ジョブの実行

次に、データをメモリに読み込み、必要な情報を抽出して、出力を Google Cloud Storage バケットにダンプするジョブを実行します。各 Reddit コメントの「タイトル」、「本文」(未加工のテキスト)、「作成タイムスタンプ」を抽出します。このデータを取得し、CSV に変換して圧縮し、URI が gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz のバケットに読み込みます。

Cloud エディタを再度参照して、cloud-dataproc/codelabs/spark-bigquery/backfill.shコードを確認します。これは、cloud-dataproc/codelabs/spark-bigquery/backfill.pyコードを実行するラッパー スクリプトです。

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

ジョブ完了メッセージがすぐに表示されます。ジョブが完了するまでに最大 15 分かかることがあります。gsutil を使用して、ストレージ バケットを再確認し、データ出力が成功したことを確認することもできます。すべてのジョブが完了したら、次のコマンドを実行します。

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

次の出力が表示されます。

a7c3c7b2e82f9fca.png

Reddit のコメントデータのバックフィルが正常に完了しました。このデータに基づいてモデルを構築する方法については、Spark-NLP Codelab をご覧ください。

10. クリーンアップ

このクイックスタートの完了後に GCP アカウントに不要な料金が発生しないようにするには:

  1. 作成した環境の Cloud Storage バケットを削除します
  2. Dataproc 環境を削除します

この Codelab 専用のプロジェクトを作成した場合は、必要に応じてプロジェクトを削除することもできます。

  1. GCP Console で、[プロジェクト] ページに移動します。
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

ライセンス

この作品は、クリエイティブ・コモンズの表示 3.0 汎用ライセンスと Apache 2.0 ライセンスにより使用許諾されています。