1. 概要
このラボでは、Cloud Dataproc で Apache Spark と Jupyter ノートブックを設定して使用する方法について説明します。
Jupyter ノートブックは、コードをインタラクティブに実行して結果をすぐに確認できるため、探索的データ分析や機械学習モデルの構築に広く使用されています。
ただし、Apache Spark と Jupyter ノートブックの設定と使用は複雑になる可能性があります。

Cloud Dataproc では、Apache Spark、Jupyter コンポーネント、コンポーネント ゲートウェイを使用して Dataproc クラスタを約 90 秒で作成できるため、このプロセスを迅速かつ簡単に行うことができます。
学習内容
この Codelab では、以下について学びます。
- クラスタ用の Google Cloud Storage バケットを作成する
- Jupyter とコンポーネント ゲートウェイを使用して Dataproc クラスタを作成します。
- Dataproc で JupyterLab ウェブ UI にアクセスする
- Spark BigQuery Storage コネクタを使用してノートブックを作成する
- Spark ジョブを実行して結果をプロットする。
Google Cloud でこのラボを実行するための総費用は約 $1 です。Cloud Dataproc の料金の詳細については、こちらをご覧ください。
2. プロジェクトを作成する
console.cloud.google.com で Google Cloud Platform コンソールにログインし、新しいプロジェクトを作成します。



次に、Google Cloud リソースを使用するために、Cloud Console で課金を有効にする必要があります。
この Codelab の操作をすべて行って、費用が生じたとしても、少額です。ただし、リソースの使用量を増やしたり、実行したままにしたりすると、費用が増加する可能性があります。この Codelab の最後のセクションでは、プロジェクトのクリーンアップについて説明します。
Google Cloud Platform の新規ユーザーは、$300 の無料トライアルをご利用いただけます。
3. 環境を設定する
まず最初に Cloud コンソールの右上にあるボタンをクリックして Cloud Shell を設定します。

Cloud Shell が読み込まれたら、次のコマンドを実行して、前の手順で取得したプロジェクト ID を設定します。
gcloud config set project <project_id>
プロジェクト ID は、クラウド コンソールの左上にあるプロジェクトをクリックしても確認できます。


次に、Dataproc、Compute Engine、BigQuery Storage API を有効にします。
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
または、Cloud Console で行うこともできます。画面の左上にあるメニュー アイコンをクリックします。

プルダウンから [API Manager] を選択します。

[API とサービスの有効化] をクリックします。

次の API を検索して有効にします。
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. GCS バケットを作成する
データに最も近いリージョンに Google Cloud Storage バケットを作成し、一意の名前を付けます。
これは Dataproc クラスタで使用されます。
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
次のような内容が出力されます
Creating gs://<your-bucket-name>/...
5. Jupyter とコンポーネント ゲートウェイで Dataproc クラスタを作成する
クラスタの作成
クラスタの環境変数を設定する
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
次に、この gcloud コマンドを実行して、クラスタで Jupyter を操作するために必要なすべてのコンポーネントでクラスタを作成します。
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
クラスタの作成中に次の出力が表示されます。
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
クラスタの作成には約 90 秒かかります。準備が整うと、Dataproc Cloud コンソール UI からクラスタにアクセスできるようになります。
待機中に、以下の説明を読んで、gcloud コマンドで使用されるフラグの詳細を確認してください。
クラスタが作成されると、次の出力が表示されます。
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
gcloud dataproc create コマンドで使用されるフラグ
gcloud dataproc create コマンドで使用されるフラグの内訳は次のとおりです。
--region=${REGION}
クラスタが作成されるリージョンとゾーンを指定します。利用可能なリージョンのリストはこちらで確認できます。
--image-version=1.4
クラスタで使用するイメージのバージョン。利用可能なバージョンのリストはこちらをご覧ください。
--bucket=${BUCKET_NAME}
クラスタで使用するために、先ほど作成した Google Cloud Storage バケットを指定します。GCS バケットを指定しない場合は、自動的に作成されます。
GCS バケットは削除されないため、クラスタを削除してもノートブックはここに保存されます。
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
Dataproc クラスタに使用するマシンタイプ。使用可能なマシンタイプの一覧はこちらをご覧ください。
フラグ –num-workers を設定しない場合、デフォルトで 1 つのマスターノードと 2 つのワーカーノードが作成されます。
--optional-components=ANACONDA,JUPYTER
オプション コンポーネントにこれらの値を設定すると、Jupyter と Anaconda(Jupyter ノートブックに必要)に必要なすべてのライブラリがクラスタにインストールされます。
--enable-component-gateway
コンポーネント ゲートウェイを有効にすると、Apache Knox と Inverting Proxy を使用して App Engine リンクが作成されます。これにより、Jupyter と JupyterLab のウェブ インターフェースに簡単、安全、認証済みのアクセスが可能になり、SSH トンネルを作成する必要がなくなります。
また、クラスタ上の他のツール(Yarn Resource Manager や Spark History Server など)へのリンクも作成されます。これらのツールは、ジョブのパフォーマンスやクラスタの使用パターンを確認するのに役立ちます。
6. Apache Spark ノートブックを作成する
JupyterLab ウェブ インターフェースへのアクセス
クラスタの準備ができたら、Dataproc クラスタ - Cloud コンソールに移動し、作成したクラスタをクリックして、ウェブ インターフェース タブに移動し、JupyterLab ウェブ インターフェースへのコンポーネント ゲートウェイのリンクを見つけることができます。

Jupyter(従来のノートブック インターフェース)または JupyterLab(Project Jupyter の次世代 UI)にアクセスできることがわかります。
JupyterLab には多くの優れた新しい UI 機能があります。ノートブックを初めて使用する場合や、最新の改善点を探している場合は、JupyterLab を使用することをおすすめします。公式ドキュメントによると、JupyterLab は最終的に従来の Jupyter インターフェースに置き換わる予定です。
Python 3 カーネルでノートブックを作成する

ランチャー タブで Python 3 ノートブック アイコンをクリックし、Python 3 カーネル(PySpark カーネルではない)でノートブックを作成します。これによりノートブックで SparkSession を構成し、BigQuery Storage API を使用するのに必要な spark-bigquery-connector を含めることができます。
ノートブックの名前を変更する

左側のサイドバーまたは上部のナビゲーションでノートブック名を右クリックし、ノートブックの名前を「BigQuery Storage & Spark DataFrames.ipynb」に変更します。
ノートブックで Spark コードを実行する

このノートブックでは spark-bigquery-connector を使用します。これは BigQuery Storage API を活用する BigQuery と Spark の間でデータの読み取りと書き込みをするツールです。
BigQuery Storage API は、RPC ベースのプロトコルを使用して BigQuery のデータにアクセスすることで、大幅な改善を実現します。並列でのデータの読み取りと書き込み、Apache Avro や Apache Arrow などのさまざまなシリアル化形式をサポートしています。大まかに言うと、これによりパフォーマンスが大幅に向上します。特に大規模なデータセットで効果があります。
最初のセルでクラスタの Scala バージョンを確認し、正しいバージョンの spark-bigquery-connector jar を含めるようにします。
入力 [1]:
!scala -version
出力 [1]:
Spark セッションを作成し、spark-bigquery-connector パッケージを含めます。
Scala バージョンが 2.11 の場合は、次のパッケージを使用します。
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
Scala のバージョンが 2.12 の場合は、次のパッケージを使用します。
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
入力 [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
repl.eagerEval を有効にする
これにより、df.show() を表示しなくても各ステップで DataFrames の結果が出力され、出力のフォーマットも改善されます。
入力 [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
BigQuery テーブルを Spark DataFrame に読み取る
一般公開された BigQuery データセットからデータを読み取って Spark DataFrame を作成します。これで spark-bigquery-connector と BigQuery Storage API を使いデータを Spark クラスタに読み込めます。
Spark DataFrame を作成し、Wikipedia ページビューの BigQuery 一般公開データセットからデータを読み込みます。spark-bigquery-connector を使用してデータを Spark に読み込み、そこでデータの処理が行われるため、データに対してクエリを実行していないことに注意してください。このコードを実行しても、Spark の遅延評価であるため、実際にはテーブルは読み込まれません。実行は次のステップで行われます。
入力 [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
出力 [4]:

必要な列を選択し、filter() のエイリアスである where() を使用してフィルタを適用します。
このコードを実行すると、Spark アクションがトリガーされ、この時点でデータは BigQuery Storage から読み取られます。
入力 [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
出力 [5]:

タイトル別にグループ分けし、ページビュー順に並べてトップ ページを確認する
入力 [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
出力 [6]:
7. ノートブックで Python プロット ライブラリを使用する
Python で使用可能なさまざまなプロット ライブラリを使用して、Spark ジョブの出力をプロットできます。
Spark DataFrame を Pandas DataFrame に変換する
Spark DataFrame を Pandas DataFrame に変換し、datehour をインデックスとして設定します。これは、Python でデータを直接操作し、利用可能な多くの Python プロット ライブラリを使用してデータをプロットする場合に便利です。
入力 [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
出力 [7]:

Pandas DataFrame のプロット
ノートブックにプロットを表示するために必要な matplotlib ライブラリをインポートします。
入力 [8]:
import matplotlib.pyplot as plt
Pandas のプロット関数を使用して、Pandas DataFrame から折れ線グラフを作成します。
入力 [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
出力 [9]:
ノートブックが GCS に保存されたことを確認する
これで、最初の Jupyter ノートブックが Dataproc クラスタで実行されるようになりました。ノートブックに名前を付けると、クラスタの作成時に使用された GCS バケットに自動的に保存されます。
これは、Cloud Shell で次の gsutil コマンドを使用して確認できます。
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
次のような内容が出力されます
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. 最適化のヒント - データをメモリにキャッシュ保存する
BigQuery Storage から毎回読み取るのではなく、メモリ内のデータを使用するシナリオもあります。
このジョブは BigQuery からデータを読み取り、フィルタを BigQuery に push します。集計は Apache Spark で計算されます。
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
上記のジョブを変更してテーブルのキャッシュを含めることができます。これにより、Apache Spark によって wiki 列のフィルタがメモリに適用されます。
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
これにより、BigQuery ストレージからデータを再度読み取るのではなく、キャッシュに保存されたデータを使用して別の Wiki 言語をフィルタリングできるため、処理が大幅に高速化されます。
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
キャッシュを削除するには、
df_wiki_all.unpersist()
9. その他のユースケースのノートブックの例
Cloud Dataproc GitHub リポジトリには、さまざまな Google Cloud Platform プロダクトとオープンソース ツールを使用して、データの読み込み、データの保存、データのプロットを行うための一般的な Apache Spark パターンを含む Jupyter ノートブックが用意されています。
10. クリーンアップ
このクイックスタートの完了後に GCP アカウントに不要な料金が発生しないようにするには:
この Codelab 専用のプロジェクトを作成した場合は、必要に応じてプロジェクトを削除することもできます。
- GCP Console で、[プロジェクト] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
ライセンス
この作品は、クリエイティブ・コモンズの表示 3.0 汎用ライセンスと Apache 2.0 ライセンスにより使用許諾されています。