在 Dataproc 上使用 PySpark 預先處理 BigQuery 資料

1. 總覽

本程式碼研究室將說明如何在 Google Cloud Platform 上使用 Apache SparkDataproc 建立資料處理管道。在資料科學和資料工程領域中,讀取一個儲存位置的資料、對資料執行轉換,然後寫入另一個儲存位置,是常見的用途。常見的轉換作業包括變更資料內容、移除不必要的資訊,以及變更檔案類型。

在這個程式碼研究室中,您將瞭解 Apache Spark,並使用 Dataproc 搭配 PySpark (Apache Spark 的 Python API)、BigQueryGoogle Cloud Storage 和 Reddit 的資料,執行範例管道。

2. Apache Spark 簡介 (選用)

根據網站說明,「Apache Spark 是用於大規模資料處理的整合數據分析引擎」。讓您在記憶體中並行分析及處理資料,進而跨多台電腦和節點進行大量並行運算。這項工具最初於 2014 年推出,是傳統 MapReduce 的升級版,至今仍是執行大規模運算時最受歡迎的架構之一。Apache Spark 是以 Scala 編寫,後來又推出 Scala、Java、Python 和 R 的 API。其中包含許多程式庫,例如用於對資料執行 SQL 查詢的 Spark SQL、用於串流資料的 Spark Streaming、用於機器學習的 MLlib,以及用於圖形處理的 GraphX,所有程式庫都會在 Apache Spark 引擎上執行。

32add0b6a47bafbc.png

Spark 可以自行執行,也可以利用 YarnMesosKubernetes 等資源管理服務進行調度。您將在本程式碼研究室中使用 Dataproc,該程式碼研究室會使用 Yarn。

Spark 中的資料原本會載入記憶體,並轉為 RDD (彈性分散式資料集)。自此之後,Spark 的開發作業已新增兩種新的資料欄式資料類型:有型別的資料集和無型別的資料框架。簡單來說,RDD 適合用於任何類型的資料,而資料集和資料框架則是針對表格資料進行最佳化。由於資料集僅適用於 Java 和 Scala API,我們會繼續使用 PySpark Dataframe API 進行本程式碼研究室。詳情請參閱 Apache Spark 說明文件

3. 用途

資料工程師通常需要讓數據資料學家輕鬆存取資料。不過,資料一開始通常都很雜亂 (難以用於現況分析),因此需要經過清理才能派上用場。例如從網路擷取的資料,可能含有奇怪的編碼或多餘的 HTML 標記。

在本研究室中,您將從 BigQuery 以 Reddit 貼文的形式載入一組資料,並將其載入在 Dataproc 上代管的 Spark 叢集,擷取實用資訊,然後將處理後的資料儲存為壓縮 CSV 檔案,並儲存在 Google Cloud Storage 中。

be2a4551ece63bfc.png

貴公司的首席數據科學家有意讓團隊處理不同的自然語言處理問題。具體來說,他們想分析「r/food」這個子版的資料。您將建立資料傾印管道,從 2017 年 1 月至 2019 年 8 月的資料開始回填。

4. 透過 BigQuery Storage API 存取 BigQuery

隨著資料量增加,使用 tabledata.list API 方法從 BigQuery 提取資料可能會耗時且效率不彰。這個方法會傳回 JSON 物件清單,並需要逐頁依序讀取,才能讀取整個資料集。

BigQuery Storage API 採用以 RPC 為基礎的通訊協定,大幅改善 BigQuery 資料存取功能。它支援並行讀取和寫入資料,以及各種序列化格式,例如 Apache AvroApache Arrow。整體來說,這可大幅提升效能,尤其是在處理大型資料集時。

在這個程式碼研究室中,您將使用 spark-bigquery-connector,在 BigQuery 和 Spark 之間讀取及寫入資料。

5. 建立專案

前往 console.cloud.google.com 登入 Google Cloud Platform 主控台,然後建立新專案:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

接著,您需要在 Cloud 控制台中啟用帳單功能,才能使用 Google Cloud 資源。

完成這個程式碼研究室的費用不應超過數美元,但如果您決定使用更多資源,或是將資源繼續執行,則可能會增加費用。本程式碼研究室的最後一節將逐步引導您清理專案。

Google Cloud Platform 新使用者享有價值$300 美元的免費試用期

6. 設定環境

您現在將透過以下步驟設定環境:

  • 啟用 Compute Engine、Dataproc 和 BigQuery Storage API
  • 設定專案
  • 建立 Dataproc 叢集
  • 建立 Google Cloud Storage 值區

啟用 API 並設定環境

按一下 Cloud 控制台右上角的按鈕,即可開啟 Cloud Shell。

a10c47ee6ca41c54.png

Cloud Shell 載入後,請執行下列指令,啟用 Compute Engine、Dataproc 和 BigQuery Storage API:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

設定專案的專案 ID。您可以前往專案選取頁面,然後搜尋專案。這可能與專案名稱不同。

e682e8227aa3c781.png

76d45fb295728542.png

執行下列指令,設定專案 ID

gcloud config set project <project_id>

請從這裡的清單中選擇專案的區域。例如 us-central1

gcloud config set dataproc/region <region>

為 Dataproc 叢集挑選名稱,並為其建立環境變數。

CLUSTER_NAME=<cluster_name>

建立 Dataproc 叢集

執行下列指令,建立 Dataproc 叢集:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

這個指令需要幾分鐘才能完成。指令的細節如下:

系統會開始建立 Dataproc 叢集,並使用您先前提供的名稱。使用 beta API 可啟用 Dataproc 的 Beta 版功能,例如 元件閘道

gcloud beta dataproc clusters create ${CLUSTER_NAME}

這會設定要用於工作站的機器類型

--worker-machine-type n1-standard-8

這會設定叢集中的工作站數量。

--num-workers 8

這會設定 Dataproc 的映像檔版本

--image-version 1.5-debian

這會設定要在叢集中使用的初始化動作。這裡加入了 pip 初始化動作。

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

這是要納入叢集中的中繼資料。您在此為 pip 初始化動作提供中繼資料。

--metadata 'PIP_PACKAGES=google-cloud-storage'

這會將選用元件設為在叢集上安裝。

--optional-components=ANACONDA

這會啟用元件閘道,讓您可以使用 Dataproc 的元件閘道查看常見的 UI,例如 Zeppelin、Jupyter 或 Spark 記錄

--enable-component-gateway

如要進一步瞭解 Dataproc,請參閱這個 codelab

建立 Google Cloud Storage 值區

您需要一個 Google Cloud Storage 值區來儲存工作輸出內容。為值區指定專屬名稱,然後執行下列指令建立新值區。值區名稱在所有使用者的所有 Google Cloud 專案中皆不得重複,因此您可能需要嘗試幾次,使用不同的名稱。如果您沒有收到 ServiceException,表示已成功建立值區。

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. 探索性資料分析

在執行預處理作業之前,請先進一步瞭解您要處理的資料性質。為此,您將探索兩種資料探索方法。首先,您將使用 BigQuery 網頁版 UI 查看部分原始資料,然後使用 PySpark 和 Dataproc 計算每個 subreddit 的貼文數量。

使用 BigQuery 網頁版 UI

首先,請使用 BigQuery 網頁版 UI 查看資料。在 Cloud Console 的選單圖示中,向下捲動並按下「BigQuery」,即可開啟 BigQuery 網頁版 UI。

242a597d7045b4da.png

接著,請在 BigQuery 網頁版 UI 查詢編輯器中執行下列指令。這會傳回 2017 年 1 月完整的 10 列資料:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

你可以捲動頁面,查看所有可用的欄位以及一些範例。具體來說,你會看到兩個代表每則貼文文字內容的欄位:「title」和「selftext」,其中「selftext」是貼文的內文。請注意其他資料欄,例如「created_utc」,這是貼文的 UTC 時間;「subreddit」則是貼文所在的子版。

執行 PySpark 工作

在 Cloud Shell 中執行下列指令,複製含有範例程式碼的存放區,並切換至正確的目錄:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

您可以使用 PySpark 判斷每個 Reddit 子版中有多少則貼文。您可以開啟 Cloud 編輯器,然後讀取指令碼 cloud-dataproc/codelabs/spark-bigquery,再於下一個步驟中執行:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

按一下 Cloud 編輯器中的「Open Terminal」(開啟終端機) 按鈕,切換回 Cloud Shell,然後執行下列指令,執行第一個 PySpark 工作:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

這個指令可讓您透過 Jobs API 將工作提交至 Dataproc。您在此處將工作類型指定為 pyspark。您可以提供叢集名稱、選用參數 ,以及包含工作的檔案名稱。您在此提供的參數 --jars 可讓您在工作中加入 spark-bigquery-connector。您也可以使用 --driver-log-levels root=FATAL 設定記錄輸出層級,這樣系統就會隱藏所有記錄輸出內容,只顯示錯誤。Spark 記錄檔通常會產生大量雜訊。

這項作業需要幾分鐘才能完成,最終輸出內容應如下所示:

6c185228db47bb18.png

8. 探索 Dataproc 和 Spark UI

在 Dataproc 上執行 Spark 工作時,您可以使用兩個 UI 查看工作 / 叢集的狀態。第一個是 Dataproc UI,只要按一下選單圖示,然後向下捲動至 Dataproc 即可找到。您可以在這裡查看目前可用的記憶體、待處理記憶體和工作站數量。

6f2987346d15c8e2.png

您也可以按一下「工作」分頁標籤,查看已完成的工作。只要按一下特定工作的「Job ID」,即可查看工作詳細資料,例如記錄和輸出內容。114d90129b0e4c88.png

1b2160f0f484594a.png

您也可以查看 Spark UI。在工作頁面中,按一下返回箭頭,然後點選「Web 介面」。您應該會在元件閘道下方看到幾個選項。在設定叢集時,您可以透過選用元件啟用其中許多元件。在本實驗室中,請按一下「Spark 記錄伺服器」。

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

系統應該會開啟下列視窗:

8f6786760f994fe8.png

所有已完成的工作都會顯示在這裡,您可以點選任何 application_id,進一步瞭解工作相關資訊。同樣地,您也可以點選到達網頁最底部的「顯示未完成的應用程式」,查看目前正在執行的所有工作。

9. 執行補齊工作

您現在將執行工作,將資料載入記憶體、擷取必要資訊,並將輸出內容轉儲至 Google Cloud Storage 值區。您將為每則 Reddit 留言擷取「標題」、「內容」(原始文字) 和「建立時間戳記」。接著,您會將這些資料轉換為 CSV 檔案,並壓縮後載入至值區,其 URI 為 gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz。

您可以再次參閱 Cloud 編輯器,查看 cloud-dataproc/codelabs/spark-bigquery/backfill.sh程式碼,這是用於執行 cloud-dataproc/codelabs/spark-bigquery/backfill.py 中的程式碼的包裝函式指令碼。

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

您很快就會看到許多工作完成訊息。這項工作最多可能需要 15 分鐘才能完成。您也可以使用 gsutil 仔細檢查儲存空間值區,確認資料輸出是否成功。所有工作完成後,請執行下列指令:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

您應該會看到以下的輸出內容:

a7c3c7b2e82f9fca.png

恭喜,您已成功完成 Reddit 留言資料的回填作業!如果您想瞭解如何根據這項資料建立模型,請繼續參閱 Spark-NLP 程式碼研究室。

10. 清理

如要避免在完成本快速入門後,讓系統向您的 GCP 帳戶收取不必要的費用,請按照下列步驟操作:

  1. 刪除您建立的環境 Cloud Storage 值區
  2. 刪除 Dataproc 環境

如果您是為了本程式碼研究室建立專案,也可以選擇刪除專案:

  1. 前往 GCP 主控台的「Projects頁面。
  2. 在專案清單中選取要刪除的專案,然後按一下「刪除」
  3. 在方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。

授權

這項內容採用的授權為 Creative Commons 姓名標示 3.0 通用授權和 Apache 2.0 授權。