在 Dataproc 中使用 PySpark 預先處理 BigQuery 資料

1. 總覽

本程式碼研究室將逐步說明如何在 Google Cloud Platform 上,使用 Apache SparkDataproc 建立資料處理管道。在數據資料學和資料工程領域中,經常需要從某個儲存位置讀取資料、在儲存位置執行轉換,以及將資料寫入另一個儲存空間位置。常見的轉換包括變更資料內容、去除不必要的資訊,以及變更檔案類型。

在本程式碼研究室中,您將瞭解 Apache Spark、使用 Dataproc 搭配 PySpark (Apache Spark 的 Python API)、BigQueryGoogle Cloud Storage 以及 Reddit 的資料執行範例管道。

2. Apache Spark 簡介 (選用)

根據網站的內容:「Apache Spark 是用於處理大規模資料的整合式數據分析引擎。這項服務可讓您在記憶體內並行分析及處理資料,以便跨多個不同的機器和節點進行大規模平行運算。它最初是在 2014 年升級為傳統 MapReduce 時發行,至今仍是執行大規模運算最常見的架構之一。Apache Spark 是以 Scala 編寫,隨後又有 Scala、Java、Python 和 R 的 API。其中包含多種程式庫,例如用於對資料執行 SQL 查詢的 Spark SQL、用於串流資料的 Spark Streaming、適用於機器學習的 MLlib,以及用於圖形處理的 GraphX (這些程式庫都是在 Apache Spark 引擎上執行)。

32add0b6a47bafbc.png

Spark 可自行執行,也可以使用 YarnMesosKubernetes 等資源管理服務進行資源調度。您將在本程式碼研究室中使用 Dataproc,這個程式碼研究室運用了 Yarn。

Spark 中的資料一開始會載入到記憶體中,稱為 RDD 或具彈性的分散式資料集。在 Spark 上進行開發後,新增了兩種新的資料欄式類型資料類型:資料集 (類型) 和 DataFrame (無類型) 的 DataFrame。大致上來說,彈性分散式資料集 (RDD) 適用於任何類型的資料,而資料集和 DataFrames 已針對表格資料進行最佳化。由於資料集僅適用於 Java 和 Scala API,因此我們將繼續在本程式碼研究室中使用 PySpark DataFrame API。詳情請參閱 Apache Spark 說明文件

3. 用途

資料工程師通常需要將資料方便數據資料學家輕鬆存取,不過,資料最初通常髒亂,且難以在目前的狀態下用於數據分析,且需要先清理才能發揮大量效用。例如從網路抓取的資料可能包含奇怪的編碼或多餘的 HTML 標記。

在本研究室中,您會將一組以 Reddit 格式發布的資料,載入 Dataproc 託管的 Spark 叢集、擷取實用資訊,並將處理後的資料以 CSV 壓縮檔的形式儲存在 Google Cloud Storage 中。

be2a4551ece63bfc.png

貴公司的數據資料學家希望團隊協助處理不同的自然語言處理問題。具體而言,他們想要分析「r/food」下的資料。從 2017 年 1 月至 2019 年 8 月的補充作業開始,您將建立資料轉儲管道。

4. 透過 BigQuery Storage API 使用 BigQuery

使用 tabledata.list API 方法從 BigQuery 提取資料可能會耗費大量時間,而且效率會隨著資料量的成長而提高。這個方法會傳回 JSON 物件清單,且需要一次讀取一頁才能讀取整個資料集。

BigQuery Storage API 使用以遠端程序呼叫 (RPC) 為基礎的通訊協定,可大幅改善存取 BigQuery 資料的方式。同時支援平行讀取和寫入資料,以及各種序列化格式,例如 Apache AvroApache Arrow。整體來說,這可大幅改善成效,特別是在資料集較大的情況下。

在本程式碼研究室中,您將使用 spark-bigquery-connector 在 BigQuery 和 Spark 之間讀取和寫入資料。

5. 建立專案

前往 console.cloud.google.com 登入 Google Cloud Platform 控制台,並建立新專案:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

接下來,您需要在 Cloud 控制台中啟用計費功能,才能使用 Google Cloud 資源。

執行本程式碼研究室的成本不應超過數美元,但如果您決定使用更多資源或讓資源繼續運作,費用會增加。本程式碼研究室的最後一個部分會引導您清除專案。

新使用者符合 $300 美元免費試用資格的 Google Cloud Platform。

6. 設定環境

現在您可以按照以下方式完成環境設定:

  • 啟用 Compute Engine、Dataproc 和 BigQuery Storage API
  • 調整專案設定
  • 建立 Dataproc 叢集
  • 建立 Google Cloud Storage 值區

啟用 API 並設定環境

按下 Cloud 控制台右上角的按鈕,開啟 Cloud Shell。

a10c47ee6ca41c54.png

Cloud Shell 載入後,執行下列指令,啟用 Compute Engine、Dataproc 和 BigQuery Storage API:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

設定專案的專案 ID。請前往專案選取頁面搜尋專案,即可找到這組 ID。這可能與專案名稱不同。

e682e8227aa3c781.png

76d45fb295728542.png

執行下列指令以設定專案 ID

gcloud config set project <project_id>

如要設定專案的區域,請從這裡的清單中選擇區域。例如 us-central1

gcloud config set dataproc/region <region>

為 Dataproc 叢集命名,並為叢集建立環境變數。

CLUSTER_NAME=<cluster_name>

正在建立 Dataproc 叢集

請執行下列指令來建立 Dataproc 叢集:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

這個指令會在幾分鐘內執行完畢。如要細分指令:

這項操作會使用您先前提供的名稱建立 Dataproc 叢集。使用 beta API 會啟用 Dataproc 的 Beta 版功能,例如元件閘道

gcloud beta dataproc clusters create ${CLUSTER_NAME}

這會設定工作站使用的機器類型

--worker-machine-type n1-standard-8

這會設定叢集的工作站數量。

--num-workers 8

這會設定 Dataproc 的映像檔版本

--image-version 1.5-debian

這會設定要在叢集使用的初始化動作。您在此納入 pip 初始化動作。

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

這是要納入叢集的中繼資料。在此,您將提供 pip 初始化動作的中繼資料。

--metadata 'PIP_PACKAGES=google-cloud-storage'

這會設定要在叢集上安裝的選用元件

--optional-components=ANACONDA

這項操作會啟用元件閘道,讓您使用 Dataproc 的元件閘道查看常用 UI,例如 Zeppelin、Jupyter 或 Spark 記錄

--enable-component-gateway

如要深入瞭解 Dataproc,請參閱這個程式碼研究室

建立 Google Cloud Storage 值區

您必須為工作輸出建立 Google Cloud Storage 值區。為值區設定不重複的名稱,然後執行下列指令來建立新的值區。在所有 Google Cloud 專案中,所有使用者的值區名稱都不重複,因此您可能需要嘗試使用不同名稱數次。如果您沒有收到 ServiceException,代表已成功建立值區。

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. 探索性資料分析

執行預先處理作業之前,請先進一步瞭解所處理資料的性質。為此,你將瞭解兩種資料探索方法。首先,您將使用 BigQuery 網頁版 UI 查看一些原始資料,接著使用 PySpark 和 Dataproc 計算每個子 reddit 的文章數量。

使用 BigQuery 網頁版 UI

請先使用 BigQuery 網頁版 UI 查看資料。在 Cloud 控制台的選單圖示中,向下捲動並按下「BigQuery」開啟 BigQuery 網頁版 UI。

242a597d7045b4da.png

接著,在 BigQuery 網頁版 UI 查詢編輯器中執行下列指令。這樣會傳回 2017 年 1 月之後的 10 列完整資料:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

您可以捲動頁面,查看所有可用欄及一些範例。尤其是,您會看到兩個欄位,分別代表每篇文章的文字內容:「title」和「selftext」即為文章內文。請一併留意其他資料欄,如「created_utc」也就是發布文章的時間和「subreddit」該則貼文所屬的子來源。

執行 PySpark 工作

在 Cloud Shell 中執行下列指令,使用程式碼範例複製存放區,並將 cd 複製到正確的目錄:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

您可以利用 PySpark 來判斷每個 subreddit 的貼文數。您可以開啟 Cloud 編輯器並讀取指令碼 cloud-dataproc/codelabs/spark-bigquery,然後在下一個步驟中執行:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

按一下 [開啟終端機]按鈕,返回 Cloud Shell 並執行下列指令,執行第一項 PySpark 工作:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

這個指令可讓您透過 Jobs API 將工作提交至 Dataproc。在這裡,您要將工作類型指定為 pyspark。您可以提供叢集名稱、選用參數 和包含工作的檔案名稱。您在此提供 --jars 參數,可讓您在工作中加入 spark-bigquery-connector。您也可以使用 --driver-log-levels root=FATAL 設定記錄輸出層級,這會隱藏所有記錄輸出內容,但發生錯誤時除外。而 Spark 記錄檔往往較為雜訊。

這項作業應該會在幾分鐘內執行,最終輸出內容應如下所示:

6c185228db47bb18.png

8. 探索 Dataproc 和 Spark UI

在 Dataproc 上執行 Spark 工作時,您可以使用兩個 UI 檢查工作 / 叢集的狀態。第一種是 Dataproc UI,點選「選單」圖示並向下捲動至「Dataproc」即可找到這個 UI您可以在這裡查看目前的可用記憶體、待處理記憶體和工作站數量。

6f2987346d15c8e2.png

您也可以按一下「工作」分頁標籤,查看已完成的工作。按一下特定工作的工作 ID,即可查看工作記錄和輸出內容等工作詳細資料。114d90129b0e4c88.png

1b2160f0f484594a.png

您也可以查看 Spark UI。在工作頁面中,按一下返回箭頭,然後點選「網路介面」。元件閘道下方應會顯示多個選項。許多方法可以在設定叢集時透過選用元件啟用。在這個研究室中,點選「Spark History Server」(Spark 記錄伺服器)

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

畫面上應該會開啟以下視窗:

8f6786760f994fe8.png

這裡會顯示所有已完成的工作,只要按一下 application_id 即可進一步瞭解工作。同樣地,您可以按一下 [顯示未完成的應用程式]即可查看目前執行中的所有工作。

9. 執行補充工作

您現在可以執行工作,將資料載入記憶體、擷取必要資訊,並將輸出內容轉儲到 Google Cloud Storage 值區。您將擷取「標題」、「內文」(原始文字) 和「timestamp created」。接著,您需要將這項資料轉換為 CSV 檔案並進行壓縮,再將 URI 載入值區, URI 為 gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz。

您再次參照 Cloud 編輯器來讀取 cloud-dataproc/codelabs/spark-bigquery/backfill.sh程式碼,這是用於在 cloud-dataproc/codelabs/spark-bigquery/backfill.py 中執行程式碼的包裝函式指令碼。

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

您應該很快就會看到一系列的工作完成訊息。這項工作最多可能需要 15 分鐘才能完成。您也可以使用 gsutil 再次檢查儲存空間值區,確認資料輸出成功。所有工作都完成後,請執行下列指令:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

您應該會看到以下的輸出內容:

a7c3c7b2e82f9fca.png

恭喜,您已成功完成 Reddit 留言資料的補充作業!如要瞭解如何運用這些資料建立模型,請繼續前往 Spark-NLP 程式碼研究室。

10. 清除

完成本快速入門導覽課程後,如要避免系統向您的 GCP 帳戶收取不必要的費用,請按照下列步驟操作:

  1. 刪除環境和您建立的 Cloud Storage 值區
  2. 刪除 Dataproc 環境

如果您只針對本程式碼研究室建立專案,也可以選擇刪除專案:

  1. 在 GCP 控制台中,前往「專案頁面。
  2. 在專案清單中,選取要刪除的專案,然後按一下「Delete」(刪除)
  3. 在方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。

授權

本作品採用創用 CC 姓名標示 3.0 通用授權,以及 Apache 2.0 授權。