Cloud Dataproc の Apache Spark および Jupyter Notebooks

1. 概要

このラボでは、Cloud Dataproc で Apache SparkJupyter ノートブックを設定して使用する方法について学習します。

Jupyter ノートブックは、コードをインタラクティブに実行して結果をすぐに確認できるため、探索的データ分析や ML モデルの構築に広く使用されています。

しかし、Apache Spark と Jupyter Notebook の設定と使用は複雑な場合があります。

b9ed855863c57d6.png

Cloud Dataproc では、Apache Spark、Jupyter コンポーネントコンポーネント ゲートウェイを使用して Dataproc クラスタを約 90 秒で作成できるため、これを迅速かつ簡単に作成できます。

学習内容

この Codelab では、以下について学びます。

  • クラスタ用の Google Cloud Storage バケットを作成する
  • Jupyter とコンポーネント ゲートウェイを使用して Dataproc クラスタを作成する
  • Dataproc で JupyterLab ウェブ UI にアクセスする
  • Spark BigQuery ストレージ コネクタを利用してノートブックを作成する
  • Spark ジョブを実行し、結果をプロットする。

このラボを Google Cloud で実行するための総費用は約 $1 です。Cloud Dataproc の料金について詳しくは、こちらをご覧ください。

2. プロジェクトの作成

console.cloud.google.com で Google Cloud Platform コンソールにログインし、新しいプロジェクトを作成します。

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

次に、Google Cloud リソースを使用するために、Cloud コンソールで課金を有効にする必要があります。

この Codelab を実行するために必要な費用は数ドル以上です。ただし、使用するリソースを増やす場合や、リソースを実行したままにする場合は、コストが高くなる可能性があります。この Codelab の最後のセクションでは、プロジェクトのクリーンアップについて説明します。

Google Cloud Platform の新規ユーザーは、$300 分の無料トライアルをご利用いただけます。

3. 環境の設定

まず最初に Cloud コンソールの右上にあるボタンをクリックして Cloud Shell を設定します。

a10c47ee6ca41c54.png

Cloud Shell が読み込まれたら、次のコマンドを実行して、前のステップのプロジェクト ID を設定します。**

gcloud config set project <project_id>

プロジェクト ID は、Cloud コンソールの左上にあるプロジェクトをクリックして確認することもできます。

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

次に、Dataproc、Compute Engine、BigQuery Storage API を有効にします。

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

この操作は Cloud コンソールで行うこともできます。画面の左上にあるメニュー アイコンをクリックします。

2bfc27ef9ba2ec7d.png

プルダウンから [API Manager] を選択します。

408af5f32c4b7c25.png

[API とサービスの有効化] をクリックします。

a9c0e84296a7ba5b.png

次の API を検索して有効にします。

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. GCS バケットを作成する

データに最も近いリージョンに Google Cloud Storage バケットを作成し、一意の名前を付けます。

これは Dataproc クラスタで使用されます。

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

次のような出力が表示されます。

Creating gs://<your-bucket-name>/...

5. Jupyter とコンポーネント ゲートウェイ

クラスタを作成しています

クラスタの環境変数を設定する

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

次に、この gcloud コマンドを実行して、クラスタで Jupyter を操作するために必要なすべてのコンポーネントを含むクラスタを作成します。

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

クラスタの作成中は、次の出力が表示されます。

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

クラスタの作成には 90 秒ほどかかります。作成が完了すると、Dataproc Cloud コンソール UI からクラスタにアクセスできるようになります。

待っている間は、以下に進んでください。gcloud コマンドで使用されるフラグの詳細を確認してください。

クラスタが作成されると、次の出力が表示されます。

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

gcloud dataproc create コマンドで使用されるフラグ

gcloud dataproc create コマンドで使用されるフラグの詳細は次のとおりです。

--region=${REGION}

クラスタを作成するリージョンとゾーンを指定します。利用可能なリージョンのリストについては、こちらをご覧ください。

--image-version=1.4

クラスタで使用するイメージのバージョン。利用可能なバージョンのリストについては、こちらをご覧ください。

--bucket=${BUCKET_NAME}

先ほど作成した Google Cloud Storage バケットをクラスタ用に指定します。GCS バケットを指定しない場合は、自動的に作成されます。

また、GCS バケットは削除されないため、クラスタを削除しても、ノートブックはここに保存されます。

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Dataproc クラスタに使用するマシンタイプ。利用可能なマシンタイプのリストについては、こちらをご覧ください。

–num-workers フラグを設定しない場合は、デフォルトで 1 つのマスターノードと 2 つのワーカーノードが作成されます。

--optional-components=ANACONDA,JUPYTER

オプションのコンポーネントにこれらの値を設定すると、Jupyter と Anaconda に必要なすべてのライブラリ(Jupyter ノートブックに必須)がクラスタにインストールされます。

--enable-component-gateway

[コンポーネント ゲートウェイ] を有効にすると、Apache Knox と Inverting Proxy を使用して App Engine のリンクが作成されます。これにより、Jupyter と JupyterLab のウェブ インターフェースに簡単かつ安全、かつ認証されたアクセスが可能になるため、SSH トンネルを作成する必要がなくなります。

また、Yarn Resource Manager や Spark History Server など、クラスタ上の他のツールへのリンクも作成されます。これらは、ジョブのパフォーマンスやクラスタの使用パターンを確認するのに役立ちます。

6. Apache Spark ノートブックを作成する

JupyterLab ウェブ インターフェースにアクセスする

クラスタの準備ができたら、[Dataproc クラスタ - Cloud コンソール] に移動し、作成したクラスタをクリックして [ウェブ インターフェース] タブに移動して、JupyterLab ウェブ インターフェースへのコンポーネント ゲートウェイ リンクを確認できます。

afc40202d555de47.png

お気づきかもしれませんが、従来のノートブック インターフェースである Jupyter、またはプロジェクト Jupyter の次世代 UI である JupyterLab にアクセスできます。

JupyterLab には優れた新しい UI 機能が多数用意されているため、ノートブックを初めて使用する場合や最新の改善点を探している場合は、公式のドキュメントによると、最終的には従来の Jupyter インターフェースを置き換えるため、JupyterLab を使用することをおすすめします。

Python 3 カーネルでノートブックを作成する

a463623f2ebf0518.png

[Launcher] タブで Python 3 ノートブック アイコンをクリックして、Python 3 カーネル(PySpark カーネルではない)でノートブックを作成します。これにより、ノートブックで SparkSession を構成し、BigQuery Storage API を使用するために必要な spark-bigquery-connector を含めることができます。

ノートブックの名前を変更する

196a3276ed07e1f3.png

左側のナビゲーションまたは上部のサイドバーにあるノートブック名を右クリックし、ノートブックの名前を「BigQuery Storage &Spark DataFrames.ipynb」と

ノートブックで Spark コードを実行する

fbac38062e5bb9cf.png

このノートブックでは、spark-bigquery-connector を使用します。これは BigQuery Storage API を利用して、BigQuery と Spark の間でデータの読み取りと書き込みを行うためのツールです。

BigQuery Storage API により、RPC ベースのプロトコルを使用した BigQuery 内のデータへのアクセスが大幅に改善されます。データの読み取りと書き込みを並行して行うだけでなく、Apache AvroApache Arrow などのさまざまなシリアル化形式にも対応しています。大まかに言うと、特に大規模なデータセットでは、パフォーマンスが大幅に向上します。

最初のセルでクラスタの Scala バージョンを確認し、正しいバージョンの spark-bigquery-connector jar を追加できるようにします。

入力 [1]:

!scala -version

出力 [1]:f580e442576b8b1f.png Spark セッションを作成し、spark-bigquery-connector パッケージを含めます。

Scala のバージョンが 2.11 の場合は、次のパッケージを使用します。

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Scala のバージョンが 2.12 の場合は、次のパッケージを使用します。

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

入力 [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

repl.eagerEval を有効にする

これにより、各ステップの DataFrame の結果が出力されます。df.show() を表示する必要がなく、出力の形式も改善されます。

入力 [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

BigQuery テーブルを Spark DataFrame に読み込む

一般公開の BigQuery データセットからデータを読み取って Spark DataFrame を作成する。ここでは、spark-bigquery-connector と BigQuery Storage API を使用して Spark クラスタにデータを読み込みます。

Spark DataFrame を作成し、Wikipedia ページビュー用の BigQuery 一般公開データセットからデータを読み込みます。今回は、データに対してクエリを実行していないことがわかります。spark-bigquery-connector を使用してデータを Spark に読み込み、そこでデータの処理が行われるということです。このコードを実行しても、Spark では遅延評価であり、実行は次のステップで実行されるため、実際にはテーブルは読み込まれません。

入力 [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

出力 [4]:

c107a33f6fc30ca.png

必要な列を選択し、filter() のエイリアスである where() を使用してフィルタを適用します。

このコードを実行すると、Spark アクションがトリガーされ、この時点で BigQuery ストレージからデータが読み取られます。

入力 [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

出力 [5]:

ad363cbe510d625a.png

タイトルでグループ化し、ページビューで並べ替えて上位のページを表示する

入力 [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

出力 [6]:f718abd05afc0f4.png

7. ノートブックで Python プロット ライブラリを使用する

Python で利用可能なさまざまなプロット ライブラリを利用して、Spark ジョブの出力をプロットできます。

Spark DataFrame を Pandas DataFrame に変換する

Spark DataFrame を Pandas DataFrame に変換し、日時をインデックスとして設定します。これは、Python で直接データを扱い、多くの Python のプロット ライブラリを使用してデータをプロットする場合に便利です。

入力 [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

出力 [7]:

3df2aaa2351f028d.png

Pandas DataFrame のプロット

ノートブックにプロットを表示するために必要な matplotlib ライブラリをインポートします。

入力 [8]:

import matplotlib.pyplot as plt

Pandas プロット関数を使用して、Pandas DataFrame から折れ線グラフを作成します。

入力 [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

出力 [9]:bade7042c3033594.png

ノートブックが GCS に保存されたことを確認する

これで、Dataproc クラスタで最初の Jupyter ノートブックが稼働するようになりました。ノートブックに名前を付けると、クラスタの作成時に使用した GCS バケットに自動的に保存されます。

これは、Cloud Shell で次の gsutil コマンドを使用して確認できます。

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

次のような出力が表示されます。

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. 最適化のヒント - メモリにデータをキャッシュする

BigQuery ストレージから毎回読み取るのではなく、メモリ内のデータを読み取ろうとするシナリオもあります。

このジョブは BigQuery からデータを読み取り、フィルタを BigQuery に push します。その集計は Apache Spark で計算されます。

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

テーブルのキャッシュを含めるように上記のジョブを変更すると、Apache Spark によって wiki 列のフィルタがメモリに適用されます。

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

その後、BigQuery ストレージからデータを再度読み取る代わりに、キャッシュされたデータを使用して別の Wiki 言語をフィルタリングできます。これにより、実行速度が大幅に向上します。

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

次のコマンドを実行して、キャッシュを削除できます。

df_wiki_all.unpersist()

9. その他のユースケースに使用するノートブックの例

Cloud Dataproc GitHub リポジトリには、さまざまな Google Cloud Platform プロダクトやオープンソース ツールを使用してデータの読み込み、データの保存、データのプロットを行うための一般的な Apache Spark パターンを備えた Jupyter ノートブックがあります。

10. クリーンアップ

このクイックスタートの完了後に GCP アカウントに不要な料金が発生しないようにするには:

  1. 作成した環境の Cloud Storage バケットを削除します。
  2. Dataproc 環境を削除します

この Codelab 専用のプロジェクトを作成した場合は、必要に応じてプロジェクトを削除することもできます。

  1. GCP コンソールで [プロジェクト] ページに移動します。
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

ライセンス

このソフトウェアは、クリエイティブ・コモンズの表示 3.0 汎用ライセンス、および Apache 2.0 ライセンスにより使用許諾されています。