Cloud Dataproc 中的 Apache Spark 和 Jupyter Notebook

1. 總覽

本實驗室將說明如何在 Cloud Dataproc 上設定及使用 Apache SparkJupyter 筆記本

Jupyter 記事本可讓您以互動方式執行程式碼,並立即查看結果,因此廣泛用於探索性資料分析和建構機器學習模型。

不過,設定及使用 Apache Spark 和 Jupyter 筆記本可能很複雜。

b9ed855863c57d6.png

Cloud Dataproc 可讓您在約 90 秒內建立包含 Apache Spark、Jupyter 元件元件閘道的 Dataproc 叢集,輕鬆快速地完成這項工作。

課程內容

在本程式碼研究室中,您可以瞭解如何執行以下操作:

  • 為叢集建立 Google Cloud Storage bucket
  • 建立包含 Jupyter 和元件閘道的 Dataproc 叢集,
  • 存取 Dataproc 上的 JupyterLab 網頁介面
  • 建立筆記本,並使用 Spark BigQuery Storage 連接器
  • 執行 Spark 工作並繪製結果。

在 Google Cloud 上執行這個實驗室的總費用約為 $1 美元。如要瞭解 Cloud Dataproc 的完整定價資訊,請參閱這篇文章

2. 建立專案

前往 console.cloud.google.com 登入 Google Cloud Platform Console,然後建立新專案:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

接著,您需要在 Cloud 控制台中啟用帳單,才能使用 Google Cloud 資源。

完成這項程式碼研究室的費用不應超過數美元,但如果您決定使用更多資源,或是將資源繼續執行,則可能會增加費用。本程式碼研究室的最後一節將逐步說明如何清除專案。

Google Cloud Platform 新使用者享有價值 $300 美元的免費試用期

3. 設定環境

首先,按一下雲端控制台右上角的按鈕,開啟 Cloud Shell:

a10c47ee6ca41c54.png

Cloud Shell 載入後,請執行下列指令,設定上一個步驟中的專案 ID

gcloud config set project <project_id>

您也可以在 Cloud 控制台的左上方點選專案,即可找到專案 ID:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

接著,請啟用 Dataproc、Compute Engine 和 BigQuery Storage API。

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

您也可以在 Cloud 控制台中執行這項操作。按一下畫面左上方的「選單」圖示。

2bfc27ef9ba2ec7d.png

從下拉式選單中選取「API 管理工具」。

408af5f32c4b7c25.png

按一下「啟用 API 和服務」

a9c0e84296a7ba5b.png

搜尋並啟用下列 API:

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. 建立 GCS bucket

在最靠近資料的區域建立 Google Cloud Storage bucket,並給予不重複的名稱。

這會用於 Dataproc 叢集。

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

輸出內容應如下所示:

Creating gs://<your-bucket-name>/...

5. 建立包含 Jupyter 和元件閘道的 Dataproc 叢集

建立叢集

設定叢集的環境變數

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

然後執行這項 gcloud 指令,建立叢集並搭載所有必要元件,以便在叢集上使用 Jupyter。

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

叢集建立期間,您應該會看到下列輸出內容

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

建立叢集大約需要 90 秒,完成後即可透過 Dataproc Cloud 控制台 UI 存取叢集。

等待期間,您可以繼續閱讀下文,進一步瞭解 gcloud 指令中使用的標記。

叢集建立完成後,您應該會看到下列輸出內容:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

gcloud dataproc create 指令中使用的旗標

以下是 gcloud dataproc create 指令中使用的旗標明細

--region=${REGION}

指定要建立叢集的區域和可用區。如要查看適用區域清單,請按這裡

--image-version=1.4

要在叢集中使用的映像檔版本。如要查看可用版本清單,請按這裡

--bucket=${BUCKET_NAME}

指定您先前建立的 Google Cloud Storage bucket,供叢集使用。如未提供 GCS bucket,系統會為您建立。

即使刪除叢集,筆記本也會儲存在這裡,因為 GCS 值區不會遭到刪除。

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

用於 Dataproc 叢集的機器類型。如要查看可用的機器類型清單,請按這裡

如未設定 -num-workers 旗標,系統預設會建立 1 個主要節點和 2 個工作站節點

--optional-components=ANACONDA,JUPYTER

選用元件設定這些值後,系統就會在叢集上安裝 Jupyter 和 Anaconda (Jupyter 筆記本需要 Anaconda) 的所有必要程式庫。

--enable-component-gateway

啟用「元件閘道」會使用 Apache Knox 和反向 Proxy 建立 App Engine 連結,方便您以經過驗證的方式安全地存取 Jupyter 和 JupyterLab 網頁介面,因此您不再需要建立 SSH 通道。

此外,系統也會為叢集上的其他工具建立連結,包括 Yarn 資源管理員和 Spark 記錄伺服器,方便您查看作業的效能和叢集使用模式。

6. 建立 Apache Spark 筆記本

存取 JupyterLab 網頁介面

叢集就緒後,請前往 Dataproc 叢集 - Cloud 控制台,按一下您建立的叢集,然後前往「Web Interfaces」(網頁介面) 分頁,即可找到 JupyterLab 網頁介面的元件閘道連結。

afc40202d555de47.png

您會發現自己可以存取 Jupyter (傳統筆記本介面) 或 JupyterLab (Project Jupyter 的新一代 UI)。

JupyterLab 提供了許多實用的全新 UI 功能,因此如果您是筆記本新手,或是想使用最新改良功能,建議使用 JupyterLab,因為根據官方文件,JupyterLab 最終會取代傳統的 Jupyter 介面。

建立以 Python 3 核心為基礎的筆記本

a463623f2ebf0518.png

在啟動器分頁中,按一下 Python 3 筆記本圖示,建立具有 Python 3 核心 (而非 PySpark 核心) 的筆記本,即可在筆記本中設定 SparkSession,並加入使用 BigQuery Storage API 時所需的 spark-bigquery-connector

重新命名筆記本

196a3276ed07e1f3.png

在左側邊欄或頂端導覽列中,按一下筆記本名稱,然後將筆記本重新命名為「BigQuery Storage & Spark DataFrames.ipynb」

在筆記本中執行 Spark 程式碼

fbac38062e5bb9cf.png

在本筆記本中,您將使用 spark-bigquery-connector,這是用於在 BigQuery 和 Spark 之間讀取及寫入資料的工具,可運用 BigQuery Storage API

BigQuery Storage API 採用 RPC 架構的通訊協定,可大幅提升 BigQuery 資料存取效能。支援平行讀取和寫入資料,以及各種序列化格式,例如 Apache AvroApache Arrow。整體而言,這代表效能大幅提升,尤其是在較大的資料集上。

在第一個儲存格中檢查叢集的 Scala 版本,以便加入正確版本的 spark-bigquery-connector jar。

輸入 [1]:

!scala -version

輸出 [1]:f580e442576b8b1f.png 建立 Spark 工作階段,並加入 spark-bigquery-connector 套件。

如果 Scala 版本為 2.11,請使用下列套件。

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

如果 Scala 版本為 2.12,請使用下列套件。

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

輸入 [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

啟用 repl.eagerEval

這樣一來,系統就會輸出每個步驟的 DataFrame 結果,不必再顯示 df.show(),輸出格式也會更完善。

輸入 [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

將 BigQuery 資料表讀取至 Spark DataFrame

從公開 BigQuery 資料集讀取資料,建立 Spark DataFrame。這會使用 spark-bigquery-connector 和 BigQuery Storage API,將資料載入 Spark 叢集。

建立 Spark DataFrame,並從 BigQuery 公開資料集 (Wikipedia 網頁瀏覽量) 載入資料。您會發現自己並未對資料執行查詢,因為您是使用 spark-bigquery-connector 將資料載入 Spark,並在其中處理資料。執行這段程式碼時,系統不會實際載入資料表,因為這是 Spark 中的延遲評估,執行作業會在下一個步驟中進行。

輸入內容 [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

輸出內容 [4]:

c107a33f6fc30ca.png

選取必要欄,然後使用 where() (filter() 的別名) 套用篩選器。

執行這段程式碼時,系統會觸發 Spark 動作,並從 BigQuery Storage 讀取資料。

輸入 [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

輸出內容 [5]:

ad363cbe510d625a.png

依網頁瀏覽次數排序並依標題分組,即可查看熱門網頁

輸入 [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

輸出內容 [6]:f718abd05afc0f4.png

7. 在筆記本中使用 Python 繪圖程式庫

您可以利用 Python 中提供的各種繪圖程式庫,繪製 Spark 作業的輸出內容。

將 Spark DataFrame 轉換為 Pandas DataFrame

將 Spark DataFrame 轉換為 Pandas DataFrame,並將 datehour 設為索引。如果您想直接在 Python 中處理資料,並使用許多可用的 Python 繪圖程式庫繪製資料,這項功能就非常實用。

輸入 [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

輸出內容 [7]:

3df2aaa2351f028d.png

繪製 Pandas DataFrame 圖表

匯入 matplotlib 程式庫,這是筆記本中顯示繪圖時的必要程式庫

輸入 [8]:

import matplotlib.pyplot as plt

使用 Pandas 繪圖函式,從 Pandas DataFrame 建立折線圖。

輸入 [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

輸出內容 [9]:bade7042c3033594.png

確認筆記本已儲存至 GCS

現在,您應該已在 Dataproc 叢集上啟動並執行第一個 Jupyter 筆記本。為筆記本命名,系統就會自動將筆記本儲存至建立叢集時使用的 GCS bucket。

您可以在 Cloud Shell 中使用下列 gsutil 指令檢查這項設定:

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

輸出內容應如下所示:

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. 最佳化提示 - 在記憶體中快取資料

在某些情況下,您可能希望資料位於記憶體中,而不是每次都從 BigQuery 儲存空間讀取。

這項工作會從 BigQuery 讀取資料,並將篩選條件推送至 BigQuery。接著,系統會在 Apache Spark 中計算匯總資料。

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

您可以修改上述工作,加入表格的快取,現在 Apache Spark 會在記憶體中套用 wiki 資料欄的篩選器。

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

接著,您可以使用快取資料篩選其他維基語言,不必再次從 BigQuery 儲存空間讀取資料,因此執行速度會快上許多。

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

您可以執行下列指令來移除快取:

df_wiki_all.unpersist()

9. 更多用途的範例筆記本

Cloud Dataproc GitHub 存放區提供 Jupyter 筆記本,其中包含常見的 Apache Spark 模式,可搭配各種 Google Cloud Platform 產品和開放原始碼工具載入、儲存及繪製資料:

10. 清除所用資源

完成本快速入門導覽課程後,如要避免系統向您的 GCP 帳戶收取不必要的費用,請按照下列步驟操作:

  1. 刪除您建立的環境 Cloud Storage bucket
  2. 刪除 Dataproc 環境

如果您專為這個程式碼研究室建立了專案,也可以選擇刪除專案:

  1. 前往 GCP 主控台的「Projects」(專案) 頁面。
  2. 在專案清單中選取要刪除的專案,然後點按「Delete」(刪除)
  3. 在方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 刪除專案。

授權

這項內容採用的授權為創用 CC 姓名標示 3.0 通用授權和 Apache 2.0 授權。