1. 總覽
本研究室將說明如何在 Cloud Dataproc 中設定及使用 Apache Spark 和 Jupyter 筆記本。
Jupyter 筆記本廣泛用於探索性資料分析和建構機器學習模型,因為可讓您以互動方式執行程式碼,並立即查看結果。
不過,建立及使用 Apache Spark 和 Jupyter Notebook 可能不一。
Cloud Dataproc 可讓您在約 90 秒內使用 Apache Spark、Jupyter 元件和元件閘道建立 Dataproc 叢集,輕鬆快速地完成上述作業。
課程內容
在本程式碼研究室中,您可以瞭解如何執行以下操作:
- 為叢集建立 Google Cloud Storage 值區
- 使用 Jupyter 和元件閘道建立 Dataproc 叢集,
- 在 Dataproc 上存取 JupyterLab 網頁版 UI
- 建立筆記本,利用 Spark BigQuery 儲存空間連接器
- 執行 Spark 工作並繪製結果。
在 Google Cloud 中執行這個研究室的總費用約為 $1 美元。如需 Cloud Dataproc 定價的完整資訊,請參閱這裡。
2. 建立專案
前往 console.cloud.google.com 登入 Google Cloud Platform 控制台,並建立新專案:
接下來,您需要在 Cloud 控制台中啟用計費功能,才能使用 Google Cloud 資源。
執行本程式碼研究室的成本不應超過數美元,但如果您決定使用更多資源或讓資源繼續運作,費用會增加。本程式碼研究室的最後一個部分會引導您清除專案。
新使用者符合 $300 美元免費試用資格的 Google Cloud Platform。
3. 設定環境
首先,按一下 Cloud 控制台右上角的按鈕,開啟 Cloud Shell:
Cloud Shell 載入後,執行下列指令,設定上一步的專案 ID****:
gcloud config set project <project_id>
通常只要在 Cloud 控制台左上角點選您的專案,即可查看專案 ID:
接下來,啟用 Dataproc、Compute Engine 和 BigQuery Storage API。
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
您也可以在 Cloud 控制台中執行這項操作。按一下畫面左上方的「選單」圖示。
從下拉式選單中選取 API 管理員。
按一下「啟用 API 和服務」。
搜尋並啟用下列 API:
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. 建立 GCS 值區
在最靠近資料的區域中建立 Google Cloud Storage 值區,並為值區指定專屬名稱。
這會用於 Dataproc 叢集。
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
您應該會看到下列輸出內容
Creating gs://<your-bucket-name>/...
5. 使用 Jupyter 和元件閘道
正在建立叢集
設定叢集的環境變數
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
接著執行這個 gcloud 指令,建立叢集並加入所有必要元件,以便在叢集中使用 Jupyter。
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
建立叢集時,您應該會看到以下輸出內容
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
叢集建立作業大約需要 90 秒才能完成,準備就緒後,您就能透過 Dataproc Cloud 控制台 UI 存取叢集。
等待期間,您可以繼續閱讀下方內容,進一步瞭解在 gcloud 指令中使用的標記。
叢集建立後,您應該會看到以下輸出內容:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
gcloud Dataproc create 指令中使用的旗標
以下是 gcloud Dataproc create 指令中所用標記的細目
--region=${REGION}
指定要在哪個區域和可用區建立叢集。您可以前往這裡查看適用地區清單。
--image-version=1.4
要在叢集中使用的映像檔版本。您可以前往這裡查看可用版本清單。
--bucket=${BUCKET_NAME}
指定先前為叢集建立的 Google Cloud Storage 值區。如未提供 GCS 值區,系統會為您建立值區。
即使 GCS 值區不會遭到刪除,叢集也不會刪除,因此筆記本仍會儲存在這裡。
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
要用於 Dataproc 叢集的機器類型。您可以在這裡查看可用的機器類型清單。
根據預設,如果您未設定旗標「-num-workers」,系統就會建立 1 個主要節點和 2 個工作站節點
--optional-components=ANACONDA,JUPYTER
設定這些值的選用元件後,系統會在叢集中安裝所有必要的 Jupyter 和 Anaconda 程式庫。
--enable-component-gateway
啟用元件閘道後,系統會使用 Apache Knox 和反向 Proxy 建立 App Engine 連結,讓您以安全、安全且經過驗證的存取 Jupyter 和 JupyterLab 網頁介面。也就是說,您不再需要建立 SSH 通道。
這項作業也會為叢集中的其他工具建立連結,包括 Yarn Resource Manager 和 Spark History Server,方便您查看工作效能和叢集使用模式。
6. 建立 Apache Spark 筆記本
存取 JupyterLab 網頁介面
叢集準備就緒之後,如要前往 JupyterLab 網頁介面的元件閘道連結,請依序前往「Dataproc 叢集 - Cloud 控制台」和您建立的叢集,然後前往「網頁介面」分頁。
您會發現您可以使用 Jupyter 服務。Jupyter 是傳統版筆記本介面,簡稱 JupyterLab,而 JupyterLab 是 Project Jupyter 的新一代 UI。
JupyterLab 中有許多很棒的新 UI 功能,因此如果你剛開始使用筆記本,或是想尋找最新的改善項目,建議使用 JupyterLab,因為 JupyterLab 最終會參考官方文件,最終取代傳統版 Jupyter 介面。
使用 Python 3 核心建立筆記本
在啟動器分頁中按一下「Python 3 筆記本」圖示,使用 Python 3 核心 (而非 PySpark 核心) 建立筆記本,這樣您就能在筆記本中設定 SparkSession,並加入使用 BigQuery Storage API 所需的 spark-bigquery-connector。
重新命名筆記本
在左側或上方導覽列的筆記本名稱上按一下滑鼠右鍵,然後將筆記本重新命名為「BigQuery Storage &Spark DataFrames.ipynb」
在筆記本中執行 Spark 程式碼
在這個筆記本中,您將使用 spark-bigquery-connector 這項工具來讀取及寫入 BigQuery 和 Spark 之間的資料,並利用 BigQuery Storage API。
BigQuery Storage API 使用以遠端程序呼叫 (RPC) 為基礎的通訊協定,可大幅改善存取 BigQuery 資料的方式。同時支援平行讀取和寫入資料,以及各種序列化格式,例如 Apache Avro 和 Apache Arrow。整體來說,這可大幅改善成效,特別是在資料集較大的情況下。
在第一個儲存格中,檢查叢集的 Scala 版本,這樣您就能加入正確的 spark-bigquery-connector jar 版本。
輸入內容 [1]:
!scala -version
輸出 [1]:建立 Spark 工作階段並納入 spark-bigquery-connector 套件。
如果您的 Scala 版本是 2.11,請使用下列套件。
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
如果您的 Scala 版本是 2.12,請使用下列套件。
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
輸入內容 [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
啟用 repl.eagerEval
這將輸出每個步驟中的 DataFrames 結果,您無需顯示 df.show(),同時也會改善輸出的格式。
輸入內容 [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
將 BigQuery 資料表讀取至 Spark DataFrame
讀取公開 BigQuery 資料集的資料來建立 Spark DataFrame。也就是使用 spark-bigquery-connector 和 BigQuery Storage API,將資料載入 Spark 叢集。
建立 Spark DataFrame 並從 Wikipedia 的 BigQuery 公開資料集載入資料。您會注意到,您正在使用 spark-bigquery-connector 將資料載入 Spark 並在該 Spark 中處理資料。執行這個程式碼時,系統並不會實際載入資料表,因為資料表是在 Spark 中進行延遲評估,且執行作業會在下一個步驟中進行。
輸入內容 [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
輸出內容 [4]:
選取需要的資料欄,然後使用 where()
(filter()
的別名) 套用篩選器。
執行這個程式碼時,會觸發 Spark 動作,而此時系統會從 BigQuery Storage 讀取資料。
輸入內容 [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
輸出內容 [5]:
依標題和網頁瀏覽次數分組,以查看熱門網頁
輸入內容 [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
輸出內容 [6]:
7. 在筆記本中使用 Python 繪圖程式庫
您可以利用 Python 提供的各種繪圖程式庫繪製 Spark 工作的輸出內容。
將 Spark DataFrame 轉換為 Pandas DataFrame
將 Spark DataFrame 轉換為 Pandas DataFrame,並將日期小時設為索引。如果您想直接透過 Python 處理資料,並使用許多可用的 Python 繪圖程式庫繪製資料,這項功能就非常實用。
輸入內容 [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
輸出內容 [7]:
繪製 Pandas DataFrame
匯入必要的 matDrawlib 程式庫,才能在筆記本中顯示圖
輸入內容 [8]:
import matplotlib.pyplot as plt
使用 Pandas DataFrame 建立折線圖。
輸入內容 [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
輸出內容 [9]:
檢查筆記本是否已儲存在 GCS 中
現在,您的第一個 Jupyter 筆記本應該已在 Dataproc 叢集中啟動並開始運作。為筆記本命名,即可自動儲存至建立叢集時使用的 GCS 值區。
您可以在 Cloud Shell 中使用這個 gsutil 指令進行檢查
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
您應該會看到下列輸出內容
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. 最佳化提示 - 快取記憶體中的資料
在某些情況下,您可能會想將資料儲存在記憶體中,而非每次都從 BigQuery 儲存空間讀取資料。
這項工作會從 BigQuery 讀取資料,並將篩選器推送至 BigQuery。系統會在 Apache Spark 中計算匯總結果。
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
您可以修改上述工作來加入資料表的快取,之後 Apache Spark 就會在記憶體中套用維基資料欄的篩選器。
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
接著,您可以使用快取資料篩選其他維基語言,不必再次從 BigQuery 儲存空間讀取資料,因此執行速度會更快。
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
如要移除快取,請執行以下指令:
df_wiki_all.unpersist()
9. 用於更多用途的筆記本範例
Cloud Dataproc GitHub 存放區提供 Jupyter 筆記本,常見的 Apache Spark 模式可用於載入資料、儲存資料,以及使用各種 Google Cloud Platform 產品和開放原始碼工具繪製資料:
10. 清除所用資源
完成本快速入門導覽課程後,如要避免系統向您的 GCP 帳戶收取不必要的費用,請按照下列步驟操作:
- 刪除環境和您建立的 Cloud Storage 值區
- 刪除 Dataproc 環境。
如果您只針對本程式碼研究室建立專案,也可以選擇刪除專案:
- 在 GCP 控制台中,前往「專案」頁面。
- 在專案清單中,選取要刪除的專案,然後按一下「Delete」(刪除)。
- 在方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。
授權
本作品採用創用 CC 姓名標示 3.0 通用授權,以及 Apache 2.0 授權。