1. 概要
この Codelab では、Apache Spark と Google Cloud Platform の Dataproc を使用してデータ処理パイプラインを作成する方法について説明します。データ サイエンスやデータ エンジニアリングでは、あるストレージの場所からデータを読み取り、そのデータに対して変換を行い、別のストレージの場所に書き込むのが一般的なユースケースです。一般的な変換には、データの内容の変更、不要な情報の削除、ファイル形式の変更などがあります。
この Codelab では、Apache Spark について学び、Dataproc と PySpark(Apache Spark の Python API)、BigQuery、Google Cloud Storage、Reddit のデータを使用してサンプル パイプラインを実行します。
2. Apache Spark の概要(省略可)
同ウェブサイトによると、「Apache Spark は、大規模なデータ処理のための統合分析エンジンです。"メモリ内で並列にデータを分析して処理できるため、複数の異なるマシンやノード間で大規模な並列計算を行うことができます。当初は従来の MapReduce のアップグレードとして 2014 年にリリースされましたが、今でも大規模なコンピューティングを実行するための最も一般的なフレームワークの一つです。Apache Spark は Scala で記述されており、その後、Scala、Java、Python、R の API が用意されています。データに対して SQL クエリを実行する Spark SQL、ストリーミング データ用の Spark Streaming、ML 用の MLlib、グラフ処理用の GraphX など、多数のライブラリが含まれており、これらはすべて Apache Spark エンジンで実行されます。
Spark は単独で実行することも、スケーリングのために Yarn、Mesos、Kubernetes などのリソース管理サービスを活用することもできます。この Codelab では、Yarn を利用する Dataproc を使用します。
Spark のデータは元々、RDD(復元力の高い分散データセット)と呼ばれるものにメモリに読み込まれていました。Spark での開発には、2 つの新しいカラム型データ型(型付き Dataset と型なし Dataframe)が追加されています。大まかに言うと、RDD はあらゆるタイプのデータに適していますが、データセットと DataFrame は表形式データに最適化されています。データセットは Java API と Scala API でのみ使用できるため、この Codelab では PySpark Dataframe API を使用します。詳細については、Apache Spark のドキュメントをご覧ください。
3. ユースケース
多くの場合、データ エンジニアは、データ サイエンティストがデータに簡単にアクセスできることを必要としています。しかし、データは最初はダーティ(現状のままの分析に使用することは困難)であることが多く、実際に活用するにはクリーニングが必要です。たとえば、ウェブからスクレイピングされたデータには、奇妙なエンコーディングや無関係な HTML タグが含まれている可能性があります。
このラボでは、BigQuery から Dataproc でホストされている Spark クラスタに Reddit の投稿の形式で一連のデータを読み込み、有用な情報を抽出して、処理されたデータを ZIP 形式の CSV ファイルとして Google Cloud Storage に保存します。
あなたの会社の最高データ サイエンティストは、各チームが自然言語処理のさまざまな問題に取り組むことに関心を持っています。具体的には、サブレディット「r/food」のデータの分析に関心があります。2017 年 1 月から 2019 年 8 月までのバックフィルから始まるデータダンプのパイプラインを作成します。
4. BigQuery Storage API を介した BigQuery へのアクセス
tabledata.list API メソッドを使用して BigQuery からデータを取得すると、時間がかかり、データ量が増えるため効率的でないことが判明する場合があります。このメソッドは JSON オブジェクトのリストを返します。データセット全体を読み取るには、一度に 1 ページずつ順次読み取る必要があります。
BigQuery Storage API を使用すると、RPC ベースのプロトコルを使用した BigQuery 内のデータへのアクセスが大幅に改善されます。データの読み取りと書き込みを並行して行うだけでなく、Apache Avro や Apache Arrow などのさまざまなシリアル化形式にも対応しています。大まかに言うと、特に大規模なデータセットでは、パフォーマンスが大幅に向上します。
この Codelab では、spark-bigquery-connector を使用して、BigQuery と Spark の間でデータの読み取りと書き込みを行います。
5. プロジェクトの作成
Google Cloud Platform コンソール(console.cloud.google.com)にログインし、新しいプロジェクトを作成します。
次に、Google Cloud リソースを使用するために、Cloud コンソールで課金を有効にする必要があります。
この Codelab を実行するために必要な費用は数ドル以上です。ただし、使用するリソースを増やす場合や、リソースを実行したままにする場合は、コストが高くなる可能性があります。この Codelab の最後のセクションでは、プロジェクトのクリーンアップについて説明します。
Google Cloud Platform の新規ユーザーは、$300 分の無料トライアルをご利用いただけます。
6. 環境の設定
次に、以下の方法で環境を設定します。
- Compute Engine、Dataproc、BigQuery Storage API の有効化
- プロジェクト設定の構成
- Dataproc クラスタの作成
- Google Cloud Storage バケットを作成する
API を有効にして環境を構成する
Cloud コンソールの右上にあるボタンをクリックして、Cloud Shell を開きます。
Cloud Shell が読み込まれたら、次のコマンドを実行して Compute Engine、Dataproc、BigQuery Storage API を有効にします。
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
プロジェクトのプロジェクト ID を設定します。プロジェクトの選択ページに移動して、該当するプロジェクトを検索します。プロジェクト名は異なる場合があります。
次のコマンドを実行して、プロジェクト ID を設定します。
gcloud config set project <project_id>
こちらのリストから 1 つ選択して、プロジェクトのリージョンを設定します。たとえば、us-central1
のようにします。
gcloud config set dataproc/region <region>
Dataproc クラスタの名前を選択し、その環境変数を作成します。
CLUSTER_NAME=<cluster_name>
Dataproc クラスタの作成
次のコマンドを実行して、Dataproc クラスタを作成します。
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
このコマンドが完了するまでに数分かかります。コマンドを分割するには:
先ほど指定した名前で Dataproc クラスタの作成が開始されます。beta
API を使用すると、コンポーネント ゲートウェイなどの Dataproc のベータ版機能が有効になります。
gcloud beta dataproc clusters create ${CLUSTER_NAME}
これにより、ワーカーに使用するマシンのタイプが設定されます。
--worker-machine-type n1-standard-8
これにより、クラスタのワーカー数が設定されます。
--num-workers 8
これにより、Dataproc のイメージ バージョンが設定されます。
--image-version 1.5-debian
これにより、クラスタで使用する初期化アクションが構成されます。ここでは、pip 初期化アクションを含めています。
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
これは、クラスタに含めるメタデータです。ここでは、pip
初期化アクションのメタデータを指定しています。
--metadata 'PIP_PACKAGES=google-cloud-storage'
これにより、クラスタにインストールするオプション コンポーネントが設定されます。
--optional-components=ANACONDA
これによりコンポーネント ゲートウェイが有効になり、Dataproc のコンポーネント ゲートウェイを使用して Zeppelin、Jupyter、Spark 履歴などの一般的な UI を表示できるようになります。
--enable-component-gateway
Dataproc の概要について詳しくは、こちらの Codelab をご覧ください。
Google Cloud Storage バケットを作成する
ジョブ出力には Google Cloud Storage バケットが必要です。バケットの一意の名前を確認し、次のコマンドを実行して新しいバケットを作成します。バケット名はすべてのユーザーのすべての Google Cloud プロジェクトで一意であるため、異なる名前でこれを何度か試すことが必要になる場合があります。ServiceException
を受信しない場合は、バケットは正常に作成されています。
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7. 探索的データ分析
前処理を実行する前に、処理するデータの性質について詳しく知る必要があります。そのために、2 つのデータ探索方法を見ていきます。まず、BigQuery ウェブ UI を使用して元データを表示し、PySpark と Dataproc を使用してサブレディットごとの投稿数を計算します。
BigQuery ウェブ UI の使用
まず、BigQuery ウェブ UI を使用してデータを表示します。Cloud コンソールのメニュー アイコンから下にスクロールして、[BigQuery] をクリックします。BigQuery ウェブ UI を開きます
次に、BigQuery ウェブ UI のクエリエディタで次のコマンドを実行します。すると、2017 年 1 月のデータの完全な 10 行が返されます。
select * from fh-bigquery.reddit_posts.2017_01 limit 10;
ページをスクロールすると、使用可能なすべての列といくつかの例を確認できます。特に、各投稿のテキスト コンテンツを表す「title」という 2 つの列が表示されます。「selftext」は投稿の本文です「created_utc」などの列もこれは投稿が作成された日時と “subreddit”ですこれは投稿が存在するサブレディットです
PySpark ジョブの実行
Cloud Shell で次のコマンドを実行して、サンプルコードを含むリポジトリのクローンを作成し、正しいディレクトリに移動します。
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
PySpark を使用して、サブレディットごとに投稿数を計算できます。次の手順で実行する前に、Cloud Editor を開いてスクリプト cloud-dataproc/codelabs/spark-bigquery
を読みます。
[ターミナルを開く] をクリックします。] ボタンをクリックして Cloud Shell に戻り、次のコマンドを実行して最初の PySpark ジョブを実行します。
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
このコマンドを使用すると、Jobs API を介してジョブを Dataproc に送信できます。ここでは、ジョブタイプに pyspark
を指定しています。クラスタ名、オプションのパラメータ 、ジョブを含むファイルの名前を指定できます。ここでは、spark-bigquery-connector
をジョブに含めることができるパラメータ --jars
を指定しています。--driver-log-levels root=FATAL
を使用してログ出力レベルを設定することもできます。これにより、エラーを除くすべてのログ出力が抑制されます。Spark のログは、ノイズが多くなる傾向があります。
このコマンドの実行には数分かかり、最終的な出力は次のようになります。
8. Dataproc と Spark の UI の探索
Dataproc で Spark ジョブを実行する場合、2 つの UI にアクセスしてジョブ / クラスタのステータスを確認できます。1 つ目は Dataproc UI です。この UI は、メニュー アイコンをクリックして [Dataproc] まで下にスクロールすると見つかります。ここでは、現在使用可能なメモリ、保留中のメモリ、ワーカー数を確認できます。
[ジョブ] タブをクリックして、完了したジョブを表示することもできます。特定のジョブのジョブ ID をクリックすると、ジョブのログや出力など、ジョブの詳細を確認できます。
Spark UI を表示することもできます。ジョブページで戻る矢印をクリックし、[ウェブ インターフェース] をクリックします。コンポーネント ゲートウェイにいくつかのオプションが表示されます。これらの多くは、クラスタの設定時にオプション コンポーネントを使用して有効にできます。このラボでは、[Spark History Server] をクリックします。
次のウィンドウが開きます。
完了したジョブはすべてここに表示されます。application_id をクリックすると、ジョブの詳細を確認できます。同様に [未完了のアプリケーションを表示]を クリックして現在実行中のジョブをすべて表示できます。
9. バックフィル ジョブの実行
次に、メモリにデータを読み込み、必要な情報を抽出し、出力を Google Cloud Storage バケットにダンプするジョブを実行します。抽出したのは "title" と "body" です(未加工テキスト)と「timestamp created」確認できます。次に、このデータを取得して CSV に変換し、zip 形式で圧縮し、gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz という URI のバケットに読み込みます。
再度 Cloud Editor を参照して、cloud-dataproc/codelabs/spark-bigquery/backfill.sh
のコードを読むことができます。これは、cloud-dataproc/codelabs/spark-bigquery/backfill.py
でコードを実行するためのラッパー スクリプトです。
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
すぐにジョブ完了メッセージが多数表示されるはずです。ジョブが完了するまでに 15 分ほどかかる場合があります。また、gsutil を使用してストレージ バケットを再確認し、データ出力が成功していることを確認することもできます。すべてのジョブが完了したら、次のコマンドを実行します。
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
次の出力が表示されます。
これで、Reddit コメントデータのバックフィルが正常に完了しました。このデータに基づいてモデルを構築する方法に関心がある場合は、Spark-NLP の Codelab に進んでください。
10. クリーンアップ
このクイックスタートの完了後に GCP アカウントに不要な料金が発生しないようにするには:
- 作成した環境の Cloud Storage バケットを削除します。
- Dataproc 環境を削除します。
この Codelab 専用のプロジェクトを作成した場合は、必要に応じてプロジェクトを削除することもできます。
- GCP コンソールで [プロジェクト] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
ライセンス
このソフトウェアは、クリエイティブ・コモンズの表示 3.0 汎用ライセンス、および Apache 2.0 ライセンスにより使用許諾されています。