Cloud Dataproc 中的 Apache Spark 和 Jupyter Notebook

1. 總覽

本研究室將說明如何在 Cloud Dataproc 中設定及使用 Apache SparkJupyter 筆記本

Jupyter 筆記本廣泛用於探索性資料分析和建構機器學習模型,因為可讓您以互動方式執行程式碼,並立即查看結果。

不過,建立及使用 Apache Spark 和 Jupyter Notebook 可能不一。

b9ed855863c57d6.png

Cloud Dataproc 可讓您在約 90 秒內使用 Apache Spark、Jupyter 元件元件閘道建立 Dataproc 叢集,輕鬆快速地完成上述作業。

課程內容

在本程式碼研究室中,您可以瞭解如何執行以下操作:

  • 為叢集建立 Google Cloud Storage 值區
  • 使用 Jupyter 和元件閘道建立 Dataproc 叢集,
  • 在 Dataproc 上存取 JupyterLab 網頁版 UI
  • 建立筆記本,利用 Spark BigQuery 儲存空間連接器
  • 執行 Spark 工作並繪製結果。

在 Google Cloud 中執行這個研究室的總費用約為 $1 美元。如需 Cloud Dataproc 定價的完整資訊,請參閱這裡

2. 建立專案

前往 console.cloud.google.com 登入 Google Cloud Platform 控制台,並建立新專案:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

接下來,您需要在 Cloud 控制台中啟用計費功能,才能使用 Google Cloud 資源。

執行本程式碼研究室的成本不應超過數美元,但如果您決定使用更多資源或讓資源繼續運作,費用會增加。本程式碼研究室的最後一個部分會引導您清除專案。

新使用者符合 $300 美元免費試用資格的 Google Cloud Platform。

3. 設定環境

首先,按一下 Cloud 控制台右上角的按鈕,開啟 Cloud Shell:

a10c47ee6ca41c54.png

Cloud Shell 載入後,執行下列指令,設定上一步的專案 ID****:

gcloud config set project <project_id>

通常只要在 Cloud 控制台左上角點選您的專案,即可查看專案 ID:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

接下來,啟用 Dataproc、Compute Engine 和 BigQuery Storage API。

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

您也可以在 Cloud 控制台中執行這項操作。按一下畫面左上方的「選單」圖示。

2bfc27ef9ba2ec7d.png

從下拉式選單中選取 API 管理員。

408af5f32c4b7c25.png

按一下「啟用 API 和服務」

a9c0e84296a7ba5b.png

搜尋並啟用下列 API:

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. 建立 GCS 值區

在最靠近資料的區域中建立 Google Cloud Storage 值區,並為值區指定專屬名稱。

這會用於 Dataproc 叢集。

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

您應該會看到下列輸出內容

Creating gs://<your-bucket-name>/...

5. 使用 Jupyter 和元件閘道

正在建立叢集

設定叢集的環境變數

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

接著執行這個 gcloud 指令,建立叢集並加入所有必要元件,以便在叢集中使用 Jupyter。

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

建立叢集時,您應該會看到以下輸出內容

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

叢集建立作業大約需要 90 秒才能完成,準備就緒後,您就能透過 Dataproc Cloud 控制台 UI 存取叢集。

等待期間,您可以繼續閱讀下方內容,進一步瞭解在 gcloud 指令中使用的標記。

叢集建立後,您應該會看到以下輸出內容:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

gcloud Dataproc create 指令中使用的旗標

以下是 gcloud Dataproc create 指令中所用標記的細目

--region=${REGION}

指定要在哪個區域和可用區建立叢集。您可以前往這裡查看適用地區清單。

--image-version=1.4

要在叢集中使用的映像檔版本。您可以前往這裡查看可用版本清單。

--bucket=${BUCKET_NAME}

指定先前為叢集建立的 Google Cloud Storage 值區。如未提供 GCS 值區,系統會為您建立值區。

即使 GCS 值區不會遭到刪除,叢集也不會刪除,因此筆記本仍會儲存在這裡。

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

要用於 Dataproc 叢集的機器類型。您可以在這裡查看可用的機器類型清單。

根據預設,如果您未設定旗標「-num-workers」,系統就會建立 1 個主要節點和 2 個工作站節點

--optional-components=ANACONDA,JUPYTER

設定這些值的選用元件後,系統會在叢集中安裝所有必要的 Jupyter 和 Anaconda 程式庫。

--enable-component-gateway

啟用元件閘道後,系統會使用 Apache Knox 和反向 Proxy 建立 App Engine 連結,讓您以安全、安全且經過驗證的存取 Jupyter 和 JupyterLab 網頁介面。也就是說,您不再需要建立 SSH 通道。

這項作業也會為叢集中的其他工具建立連結,包括 Yarn Resource Manager 和 Spark History Server,方便您查看工作效能和叢集使用模式。

6. 建立 Apache Spark 筆記本

存取 JupyterLab 網頁介面

叢集準備就緒之後,如要前往 JupyterLab 網頁介面的元件閘道連結,請依序前往「Dataproc 叢集 - Cloud 控制台」和您建立的叢集,然後前往「網頁介面」分頁。

afc40202d555de47.png

您會發現您可以使用 Jupyter 服務。Jupyter 是傳統版筆記本介面,簡稱 JupyterLab,而 JupyterLab 是 Project Jupyter 的新一代 UI。

JupyterLab 中有許多很棒的新 UI 功能,因此如果你剛開始使用筆記本,或是想尋找最新的改善項目,建議使用 JupyterLab,因為 JupyterLab 最終會參考官方文件,最終取代傳統版 Jupyter 介面。

使用 Python 3 核心建立筆記本

a463623f2ebf0518.png

在啟動器分頁中按一下「Python 3 筆記本」圖示,使用 Python 3 核心 (而非 PySpark 核心) 建立筆記本,這樣您就能在筆記本中設定 SparkSession,並加入使用 BigQuery Storage API 所需的 spark-bigquery-connector

重新命名筆記本

196a3276ed07e1f3.png

在左側或上方導覽列的筆記本名稱上按一下滑鼠右鍵,然後將筆記本重新命名為「BigQuery Storage &Spark DataFrames.ipynb」

在筆記本中執行 Spark 程式碼

fbac38062e5bb9cf.png

在這個筆記本中,您將使用 spark-bigquery-connector 這項工具來讀取及寫入 BigQuery 和 Spark 之間的資料,並利用 BigQuery Storage API

BigQuery Storage API 使用以遠端程序呼叫 (RPC) 為基礎的通訊協定,可大幅改善存取 BigQuery 資料的方式。同時支援平行讀取和寫入資料,以及各種序列化格式,例如 Apache AvroApache Arrow。整體來說,這可大幅改善成效,特別是在資料集較大的情況下。

在第一個儲存格中,檢查叢集的 Scala 版本,這樣您就能加入正確的 spark-bigquery-connector jar 版本。

輸入內容 [1]:

!scala -version

輸出 [1]:f580e442576b8b1f.png建立 Spark 工作階段並納入 spark-bigquery-connector 套件。

如果您的 Scala 版本是 2.11,請使用下列套件。

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

如果您的 Scala 版本是 2.12,請使用下列套件。

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

輸入內容 [2]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

啟用 repl.eagerEval

這將輸出每個步驟中的 DataFrames 結果,您無需顯示 df.show(),同時也會改善輸出的格式。

輸入內容 [3]:

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

將 BigQuery 資料表讀取至 Spark DataFrame

讀取公開 BigQuery 資料集的資料來建立 Spark DataFrame。也就是使用 spark-bigquery-connector 和 BigQuery Storage API,將資料載入 Spark 叢集。

建立 Spark DataFrame 並從 Wikipedia 的 BigQuery 公開資料集載入資料。您會注意到,您正在使用 spark-bigquery-connector 將資料載入 Spark 並在該 Spark 中處理資料。執行這個程式碼時,系統並不會實際載入資料表,因為資料表是在 Spark 中進行延遲評估,且執行作業會在下一個步驟中進行。

輸入內容 [4]:

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

輸出內容 [4]:

c107a33f6fc30ca.png

選取需要的資料欄,然後使用 where() (filter() 的別名) 套用篩選器。

執行這個程式碼時,會觸發 Spark 動作,而此時系統會從 BigQuery Storage 讀取資料。

輸入內容 [5]:

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

輸出內容 [5]:

ad363cbe510d625a.png

依標題和網頁瀏覽次數分組,以查看熱門網頁

輸入內容 [6]:

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

輸出內容 [6]:f718abd05afc0f4.png

7. 在筆記本中使用 Python 繪圖程式庫

您可以利用 Python 提供的各種繪圖程式庫繪製 Spark 工作的輸出內容。

將 Spark DataFrame 轉換為 Pandas DataFrame

將 Spark DataFrame 轉換為 Pandas DataFrame,並將日期小時設為索引。如果您想直接透過 Python 處理資料,並使用許多可用的 Python 繪圖程式庫繪製資料,這項功能就非常實用。

輸入內容 [7]:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

輸出內容 [7]:

3df2aaa2351f028d.png

繪製 Pandas DataFrame

匯入必要的 matDrawlib 程式庫,才能在筆記本中顯示圖

輸入內容 [8]:

import matplotlib.pyplot as plt

使用 Pandas DataFrame 建立折線圖。

輸入內容 [9]:

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

輸出內容 [9]:bade7042c3033594.png

檢查筆記本是否已儲存在 GCS 中

現在,您的第一個 Jupyter 筆記本應該已在 Dataproc 叢集中啟動並開始運作。為筆記本命名,即可自動儲存至建立叢集時使用的 GCS 值區。

您可以在 Cloud Shell 中使用這個 gsutil 指令進行檢查

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

您應該會看到下列輸出內容

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. 最佳化提示 - 快取記憶體中的資料

在某些情況下,您可能會想將資料儲存在記憶體中,而非每次都從 BigQuery 儲存空間讀取資料。

這項工作會從 BigQuery 讀取資料,並將篩選器推送至 BigQuery。系統會在 Apache Spark 中計算匯總結果。

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

您可以修改上述工作來加入資料表的快取,之後 Apache Spark 就會在記憶體中套用維基資料欄的篩選器。

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

接著,您可以使用快取資料篩選其他維基語言,不必再次從 BigQuery 儲存空間讀取資料,因此執行速度會更快。

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

如要移除快取,請執行以下指令:

df_wiki_all.unpersist()

9. 用於更多用途的筆記本範例

Cloud Dataproc GitHub 存放區提供 Jupyter 筆記本,常見的 Apache Spark 模式可用於載入資料、儲存資料,以及使用各種 Google Cloud Platform 產品和開放原始碼工具繪製資料:

10. 清除所用資源

完成本快速入門導覽課程後,如要避免系統向您的 GCP 帳戶收取不必要的費用,請按照下列步驟操作:

  1. 刪除環境和您建立的 Cloud Storage 值區
  2. 刪除 Dataproc 環境

如果您只針對本程式碼研究室建立專案,也可以選擇刪除專案:

  1. 在 GCP 控制台中,前往「專案頁面。
  2. 在專案清單中,選取要刪除的專案,然後按一下「Delete」(刪除)
  3. 在方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。

授權

本作品採用創用 CC 姓名標示 3.0 通用授權,以及 Apache 2.0 授權。