Cloud Dataproc の Apache Spark および Jupyter Notebooks

1. 概要

このラボでは、Cloud Dataproc で Apache SparkJupyter ノートブックを設定して使用する方法について説明します。

Jupyter ノートブックは、コードをインタラクティブに実行して結果をすぐに確認できるため、探索的データ分析や機械学習モデルの構築に広く使用されています。

ただし、Apache Spark と Jupyter ノートブックの設定と使用は複雑になる可能性があります。

b9ed855863c57d6.png

Cloud Dataproc では、Apache Spark、Jupyter コンポーネントコンポーネント ゲートウェイを使用して Dataproc クラスタを約 90 秒で作成できるため、このプロセスを迅速かつ簡単に行うことができます。

学習内容

この Codelab では、以下について学びます。

  • クラスタ用の Google Cloud Storage バケットを作成する
  • Jupyter とコンポーネント ゲートウェイを使用して Dataproc クラスタを作成します。
  • Dataproc で JupyterLab ウェブ UI にアクセスする
  • Spark BigQuery Storage コネクタを使用してノートブックを作成する
  • Spark ジョブを実行して結果をプロットする。

Google Cloud でこのラボを実行するための総費用は約 $1 です。Cloud Dataproc の料金の詳細については、こちらをご覧ください。

2. プロジェクトを作成する

console.cloud.google.com で Google Cloud Platform コンソールにログインし、新しいプロジェクトを作成します。

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

次に、Google Cloud リソースを使用するために、Cloud Console で課金を有効にする必要があります。

この Codelab の操作をすべて行って、費用が生じたとしても、少額です。ただし、リソースの使用量を増やしたり、実行したままにしたりすると、費用が増加する可能性があります。この Codelab の最後のセクションでは、プロジェクトのクリーンアップについて説明します。

Google Cloud Platform の新規ユーザーは、$300 の無料トライアルをご利用いただけます。

3. 環境を設定する

まず最初に Cloud コンソールの右上にあるボタンをクリックして Cloud Shell を設定します。

a10c47ee6ca41c54.png

Cloud Shell が読み込まれたら、次のコマンドを実行して、前の手順で取得したプロジェクト ID を設定します。

gcloud config set project <project_id>

プロジェクト ID は、クラウド コンソールの左上にあるプロジェクトをクリックしても確認できます。

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

次に、Dataproc、Compute Engine、BigQuery Storage API を有効にします。

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

または、Cloud Console で行うこともできます。画面の左上にあるメニュー アイコンをクリックします。

2bfc27ef9ba2ec7d.png

プルダウンから [API Manager] を選択します。

408af5f32c4b7c25.png

[API とサービスの有効化] をクリックします。

a9c0e84296a7ba5b.png

次の API を検索して有効にします。

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. GCS バケットを作成する

データに最も近いリージョンに Google Cloud Storage バケットを作成し、一意の名前を付けます。

これは Dataproc クラスタで使用されます。

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

次のような内容が出力されます

Creating gs://<your-bucket-name>/...

5. Jupyter とコンポーネント ゲートウェイで Dataproc クラスタを作成する

クラスタの作成

クラスタの環境変数を設定する

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

次に、この gcloud コマンドを実行して、クラスタで Jupyter を操作するために必要なすべてのコンポーネントでクラスタを作成します。

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

クラスタの作成中に次の出力が表示されます。

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

クラスタの作成には約 90 秒かかります。準備が整うと、Dataproc Cloud コンソール UI からクラスタにアクセスできるようになります。

待機中に、以下の説明を読んで、gcloud コマンドで使用されるフラグの詳細を確認してください。

クラスタが作成されると、次の出力が表示されます。

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

gcloud dataproc create コマンドで使用されるフラグ

gcloud dataproc create コマンドで使用されるフラグの内訳は次のとおりです。

--region=${REGION}

クラスタが作成されるリージョンとゾーンを指定します。利用可能なリージョンのリストはこちらで確認できます。

--image-version=1.4

クラスタで使用するイメージのバージョン。利用可能なバージョンのリストはこちらをご覧ください。

--bucket=${BUCKET_NAME}

クラスタで使用するために、先ほど作成した Google Cloud Storage バケットを指定します。GCS バケットを指定しない場合は、自動的に作成されます。

GCS バケットは削除されないため、クラスタを削除してもノートブックはここに保存されます。

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

Dataproc クラスタに使用するマシンタイプ。使用可能なマシンタイプの一覧はこちらをご覧ください。

フラグ –num-workers を設定しない場合、デフォルトで 1 つのマスターノードと 2 つのワーカーノードが作成されます。

--optional-components=ANACONDA,JUPYTER

オプション コンポーネントにこれらの値を設定すると、Jupyter と Anaconda(Jupyter ノートブックに必要)に必要なすべてのライブラリがクラスタにインストールされます。

--enable-component-gateway

コンポーネント ゲートウェイを有効にすると、Apache Knox と Inverting Proxy を使用して App Engine リンクが作成されます。これにより、Jupyter と JupyterLab のウェブ インターフェースに簡単、安全、認証済みのアクセスが可能になり、SSH トンネルを作成する必要がなくなります。

また、クラスタ上の他のツール(Yarn Resource Manager や Spark History Server など)へのリンクも作成されます。これらのツールは、ジョブのパフォーマンスやクラスタの使用パターンを確認するのに役立ちます。

6. Apache Spark ノートブックを作成する

JupyterLab ウェブ インターフェースへのアクセス

クラスタの準備ができたら、Dataproc クラスタ - Cloud コンソールに移動し、作成したクラスタをクリックして、ウェブ インターフェース タブに移動し、JupyterLab ウェブ インターフェースへのコンポーネント ゲートウェイのリンクを見つけることができます。

afc40202d555de47.png

Jupyter(従来のノートブック インターフェース)または JupyterLab(Project Jupyter の次世代 UI)にアクセスできることがわかります。

JupyterLab には多くの優れた新しい UI 機能があります。ノートブックを初めて使用する場合や、最新の改善点を探している場合は、JupyterLab を使用することをおすすめします。公式ドキュメントによると、JupyterLab は最終的に従来の Jupyter インターフェースに置き換わる予定です。

Python 3 カーネルでノートブックを作成する

a463623f2ebf0518.png

ランチャー タブで Python 3 ノートブック アイコンをクリックし、Python 3 カーネル(PySpark カーネルではない)でノートブックを作成します。これによりノートブックで SparkSession を構成し、BigQuery Storage API を使用するのに必要な spark-bigquery-connector を含めることができます。

ノートブックの名前を変更する

196a3276ed07e1f3.png

左側のサイドバーまたは上部のナビゲーションでノートブック名を右クリックし、ノートブックの名前を「BigQuery Storage & Spark DataFrames.ipynb」に変更します。

ノートブックで Spark コードを実行する

fbac38062e5bb9cf.png

このノートブックでは spark-bigquery-connector を使用します。これは BigQuery Storage API を活用する BigQuery と Spark の間でデータの読み取りと書き込みをするツールです。

BigQuery Storage API は、RPC ベースのプロトコルを使用して BigQuery のデータにアクセスすることで、大幅な改善を実現します。並列でのデータの読み取りと書き込み、Apache AvroApache Arrow などのさまざまなシリアル化形式をサポートしています。大まかに言うと、これによりパフォーマンスが大幅に向上します。特に大規模なデータセットで効果があります。

最初のセルでクラスタの Scala バージョンを確認し、正しいバージョンの spark-bigquery-connector jar を含めるようにします。

入力 [1]:

!scala -version

出力 [1]:f580e442576b8b1f.png Spark セッションを作成し、spark-bigquery-connector パッケージを含めます。

Scala バージョンが 2.11 の場合は、次のパッケージを使用します。

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

Scala のバージョンが 2.12 の場合は、次のパッケージを使用します。

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

入力 [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

repl.eagerEval を有効にする

これにより、df.show() を表示しなくても各ステップで DataFrames の結果が出力され、出力のフォーマットも改善されます。

入力 [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

BigQuery テーブルを Spark DataFrame に読み取る

一般公開された BigQuery データセットからデータを読み取って Spark DataFrame を作成します。これで spark-bigquery-connector と BigQuery Storage API を使いデータを Spark クラスタに読み込めます。

Spark DataFrame を作成し、Wikipedia ページビューの BigQuery 一般公開データセットからデータを読み込みます。spark-bigquery-connector を使用してデータを Spark に読み込み、そこでデータの処理が行われるため、データに対してクエリを実行していないことに注意してください。このコードを実行しても、Spark の遅延評価であるため、実際にはテーブルは読み込まれません。実行は次のステップで行われます。

入力 [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

出力 [4]:

c107a33f6fc30ca.png

必要な列を選択し、filter() のエイリアスである where() を使用してフィルタを適用します。

このコードを実行すると、Spark アクションがトリガーされ、この時点でデータは BigQuery Storage から読み取られます。

入力 [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

出力 [5]:

ad363cbe510d625a.png

タイトル別にグループ分けし、ページビュー順に並べてトップ ページを確認する

入力 [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

出力 [6]:f718abd05afc0f4.png

7. ノートブックで Python プロット ライブラリを使用する

Python で使用可能なさまざまなプロット ライブラリを使用して、Spark ジョブの出力をプロットできます。

Spark DataFrame を Pandas DataFrame に変換する

Spark DataFrame を Pandas DataFrame に変換し、datehour をインデックスとして設定します。これは、Python でデータを直接操作し、利用可能な多くの Python プロット ライブラリを使用してデータをプロットする場合に便利です。

入力 [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

出力 [7]:

3df2aaa2351f028d.png

Pandas DataFrame のプロット

ノートブックにプロットを表示するために必要な matplotlib ライブラリをインポートします。

入力 [8]:

import matplotlib.pyplot as plt

Pandas のプロット関数を使用して、Pandas DataFrame から折れ線グラフを作成します。

入力 [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

出力 [9]:bade7042c3033594.png

ノートブックが GCS に保存されたことを確認する

これで、最初の Jupyter ノートブックが Dataproc クラスタで実行されるようになりました。ノートブックに名前を付けると、クラスタの作成時に使用された GCS バケットに自動的に保存されます。

これは、Cloud Shell で次の gsutil コマンドを使用して確認できます。

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

次のような内容が出力されます

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. 最適化のヒント - データをメモリにキャッシュ保存する

BigQuery Storage から毎回読み取るのではなく、メモリ内のデータを使用するシナリオもあります。

このジョブは BigQuery からデータを読み取り、フィルタを BigQuery に push します。集計は Apache Spark で計算されます。

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

上記のジョブを変更してテーブルのキャッシュを含めることができます。これにより、Apache Spark によって wiki 列のフィルタがメモリに適用されます。

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

これにより、BigQuery ストレージからデータを再度読み取るのではなく、キャッシュに保存されたデータを使用して別の Wiki 言語をフィルタリングできるため、処理が大幅に高速化されます。

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

キャッシュを削除するには、

df_wiki_all.unpersist()

9. その他のユースケースのノートブックの例

Cloud Dataproc GitHub リポジトリには、さまざまな Google Cloud Platform プロダクトとオープンソース ツールを使用して、データの読み込み、データの保存、データのプロットを行うための一般的な Apache Spark パターンを含む Jupyter ノートブックが用意されています。

10. クリーンアップ

このクイックスタートの完了後に GCP アカウントに不要な料金が発生しないようにするには:

  1. 作成した環境の Cloud Storage バケットを削除します
  2. Dataproc 環境を削除します

この Codelab 専用のプロジェクトを作成した場合は、必要に応じてプロジェクトを削除することもできます。

  1. GCP Console で、[プロジェクト] ページに移動します。
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

ライセンス

この作品は、クリエイティブ・コモンズの表示 3.0 汎用ライセンスと Apache 2.0 ライセンスにより使用許諾されています。