1. 總覽
本程式碼研究室將說明如何在 Google Cloud Platform 上使用 Apache Spark 和 Dataproc 建立資料處理管道。在資料科學和資料工程領域中,讀取一個儲存位置的資料、對資料執行轉換,然後寫入另一個儲存位置,是常見的用途。常見的轉換作業包括變更資料內容、移除不必要的資訊,以及變更檔案類型。
在這個程式碼研究室中,您將瞭解 Apache Spark,並使用 Dataproc 搭配 PySpark (Apache Spark 的 Python API)、BigQuery、Google Cloud Storage 和 Reddit 的資料,執行範例管道。
2. Apache Spark 簡介 (選用)
根據網站說明,「Apache Spark 是用於大規模資料處理的整合數據分析引擎」。讓您在記憶體中並行分析及處理資料,進而跨多台電腦和節點進行大量並行運算。這項工具最初於 2014 年推出,是傳統 MapReduce 的升級版,至今仍是執行大規模運算時最受歡迎的架構之一。Apache Spark 是以 Scala 編寫,後來又推出 Scala、Java、Python 和 R 的 API。其中包含許多程式庫,例如用於對資料執行 SQL 查詢的 Spark SQL、用於串流資料的 Spark Streaming、用於機器學習的 MLlib,以及用於圖形處理的 GraphX,所有程式庫都會在 Apache Spark 引擎上執行。
Spark 可以自行執行,也可以利用 Yarn、Mesos 或 Kubernetes 等資源管理服務進行調度。您將在本程式碼研究室中使用 Dataproc,該程式碼研究室會使用 Yarn。
Spark 中的資料原本會載入記憶體,並轉為 RDD (彈性分散式資料集)。自此之後,Spark 的開發作業已新增兩種新的資料欄式資料類型:有型別的資料集和無型別的資料框架。簡單來說,RDD 適合用於任何類型的資料,而資料集和資料框架則是針對表格資料進行最佳化。由於資料集僅適用於 Java 和 Scala API,我們會繼續使用 PySpark Dataframe API 進行本程式碼研究室。詳情請參閱 Apache Spark 說明文件。
3. 用途
資料工程師通常需要讓數據資料學家輕鬆存取資料。不過,資料一開始通常都很雜亂 (難以用於現況分析),因此需要經過清理才能派上用場。例如從網路擷取的資料,可能含有奇怪的編碼或多餘的 HTML 標記。
在本研究室中,您將從 BigQuery 以 Reddit 貼文的形式載入一組資料,並將其載入在 Dataproc 上代管的 Spark 叢集,擷取實用資訊,然後將處理後的資料儲存為壓縮 CSV 檔案,並儲存在 Google Cloud Storage 中。
貴公司的首席數據科學家有意讓團隊處理不同的自然語言處理問題。具體來說,他們想分析「r/food」這個子版的資料。您將建立資料傾印管道,從 2017 年 1 月至 2019 年 8 月的資料開始回填。
4. 透過 BigQuery Storage API 存取 BigQuery
隨著資料量增加,使用 tabledata.list API 方法從 BigQuery 提取資料可能會耗時且效率不彰。這個方法會傳回 JSON 物件清單,並需要逐頁依序讀取,才能讀取整個資料集。
BigQuery Storage API 採用以 RPC 為基礎的通訊協定,大幅改善 BigQuery 資料存取功能。它支援並行讀取和寫入資料,以及各種序列化格式,例如 Apache Avro 和 Apache Arrow。整體來說,這可大幅提升效能,尤其是在處理大型資料集時。
在這個程式碼研究室中,您將使用 spark-bigquery-connector,在 BigQuery 和 Spark 之間讀取及寫入資料。
5. 建立專案
前往 console.cloud.google.com 登入 Google Cloud Platform 主控台,然後建立新專案:
接著,您需要在 Cloud 控制台中啟用帳單功能,才能使用 Google Cloud 資源。
完成這個程式碼研究室的費用不應超過數美元,但如果您決定使用更多資源,或是將資源繼續執行,則可能會增加費用。本程式碼研究室的最後一節將逐步引導您清理專案。
Google Cloud Platform 新使用者享有價值$300 美元的免費試用期。
6. 設定環境
您現在將透過以下步驟設定環境:
- 啟用 Compute Engine、Dataproc 和 BigQuery Storage API
- 設定專案
- 建立 Dataproc 叢集
- 建立 Google Cloud Storage 值區
啟用 API 並設定環境
按一下 Cloud 控制台右上角的按鈕,即可開啟 Cloud Shell。
Cloud Shell 載入後,請執行下列指令,啟用 Compute Engine、Dataproc 和 BigQuery Storage API:
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
設定專案的專案 ID。您可以前往專案選取頁面,然後搜尋專案。這可能與專案名稱不同。
執行下列指令,設定專案 ID:
gcloud config set project <project_id>
請從這裡的清單中選擇專案的區域。例如 us-central1
。
gcloud config set dataproc/region <region>
為 Dataproc 叢集挑選名稱,並為其建立環境變數。
CLUSTER_NAME=<cluster_name>
建立 Dataproc 叢集
執行下列指令,建立 Dataproc 叢集:
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
這個指令需要幾分鐘才能完成。指令的細節如下:
系統會開始建立 Dataproc 叢集,並使用您先前提供的名稱。使用 beta
API 可啟用 Dataproc 的 Beta 版功能,例如 元件閘道。
gcloud beta dataproc clusters create ${CLUSTER_NAME}
這會設定要用於工作站的機器類型。
--worker-machine-type n1-standard-8
這會設定叢集中的工作站數量。
--num-workers 8
這會設定 Dataproc 的映像檔版本。
--image-version 1.5-debian
這會設定要在叢集中使用的初始化動作。這裡加入了 pip 初始化動作。
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
這是要納入叢集中的中繼資料。您在此為 pip
初始化動作提供中繼資料。
--metadata 'PIP_PACKAGES=google-cloud-storage'
這會將選用元件設為在叢集上安裝。
--optional-components=ANACONDA
這會啟用元件閘道,讓您可以使用 Dataproc 的元件閘道查看常見的 UI,例如 Zeppelin、Jupyter 或 Spark 記錄
--enable-component-gateway
如要進一步瞭解 Dataproc,請參閱這個 codelab。
建立 Google Cloud Storage 值區
您需要一個 Google Cloud Storage 值區來儲存工作輸出內容。為值區指定專屬名稱,然後執行下列指令建立新值區。值區名稱在所有使用者的所有 Google Cloud 專案中皆不得重複,因此您可能需要嘗試幾次,使用不同的名稱。如果您沒有收到 ServiceException
,表示已成功建立值區。
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7. 探索性資料分析
在執行預處理作業之前,請先進一步瞭解您要處理的資料性質。為此,您將探索兩種資料探索方法。首先,您將使用 BigQuery 網頁版 UI 查看部分原始資料,然後使用 PySpark 和 Dataproc 計算每個 subreddit 的貼文數量。
使用 BigQuery 網頁版 UI
首先,請使用 BigQuery 網頁版 UI 查看資料。在 Cloud Console 的選單圖示中,向下捲動並按下「BigQuery」,即可開啟 BigQuery 網頁版 UI。
接著,請在 BigQuery 網頁版 UI 查詢編輯器中執行下列指令。這會傳回 2017 年 1 月完整的 10 列資料:
select * from fh-bigquery.reddit_posts.2017_01 limit 10;
你可以捲動頁面,查看所有可用的欄位以及一些範例。具體來說,你會看到兩個代表每則貼文文字內容的欄位:「title」和「selftext」,其中「selftext」是貼文的內文。請注意其他資料欄,例如「created_utc」,這是貼文的 UTC 時間;「subreddit」則是貼文所在的子版。
執行 PySpark 工作
在 Cloud Shell 中執行下列指令,複製含有範例程式碼的存放區,並切換至正確的目錄:
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
您可以使用 PySpark 判斷每個 Reddit 子版中有多少則貼文。您可以開啟 Cloud 編輯器,然後讀取指令碼 cloud-dataproc/codelabs/spark-bigquery
,再於下一個步驟中執行:
按一下 Cloud 編輯器中的「Open Terminal」(開啟終端機) 按鈕,切換回 Cloud Shell,然後執行下列指令,執行第一個 PySpark 工作:
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
這個指令可讓您透過 Jobs API 將工作提交至 Dataproc。您在此處將工作類型指定為 pyspark
。您可以提供叢集名稱、選用參數 ,以及包含工作的檔案名稱。您在此提供的參數 --jars
可讓您在工作中加入 spark-bigquery-connector
。您也可以使用 --driver-log-levels root=FATAL
設定記錄輸出層級,這樣系統就會隱藏所有記錄輸出內容,只顯示錯誤。Spark 記錄檔通常會產生大量雜訊。
這項作業需要幾分鐘才能完成,最終輸出內容應如下所示:
8. 探索 Dataproc 和 Spark UI
在 Dataproc 上執行 Spark 工作時,您可以使用兩個 UI 查看工作 / 叢集的狀態。第一個是 Dataproc UI,只要按一下選單圖示,然後向下捲動至 Dataproc 即可找到。您可以在這裡查看目前可用的記憶體、待處理記憶體和工作站數量。
您也可以按一下「工作」分頁標籤,查看已完成的工作。只要按一下特定工作的「Job ID」,即可查看工作詳細資料,例如記錄和輸出內容。
您也可以查看 Spark UI。在工作頁面中,按一下返回箭頭,然後點選「Web 介面」。您應該會在元件閘道下方看到幾個選項。在設定叢集時,您可以透過選用元件啟用其中許多元件。在本實驗室中,請按一下「Spark 記錄伺服器」。
系統應該會開啟下列視窗:
所有已完成的工作都會顯示在這裡,您可以點選任何 application_id,進一步瞭解工作相關資訊。同樣地,您也可以點選到達網頁最底部的「顯示未完成的應用程式」,查看目前正在執行的所有工作。
9. 執行補齊工作
您現在將執行工作,將資料載入記憶體、擷取必要資訊,並將輸出內容轉儲至 Google Cloud Storage 值區。您將為每則 Reddit 留言擷取「標題」、「內容」(原始文字) 和「建立時間戳記」。接著,您會將這些資料轉換為 CSV 檔案,並壓縮後載入至值區,其 URI 為 gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz。
您可以再次參閱 Cloud 編輯器,查看 cloud-dataproc/codelabs/spark-bigquery/backfill.sh
的程式碼,這是用於執行 cloud-dataproc/codelabs/spark-bigquery/backfill.py
中的程式碼的包裝函式指令碼。
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
您很快就會看到許多工作完成訊息。這項工作最多可能需要 15 分鐘才能完成。您也可以使用 gsutil 仔細檢查儲存空間值區,確認資料輸出是否成功。所有工作完成後,請執行下列指令:
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
您應該會看到以下的輸出內容:
恭喜,您已成功完成 Reddit 留言資料的回填作業!如果您想瞭解如何根據這項資料建立模型,請繼續參閱 Spark-NLP 程式碼研究室。
10. 清理
如要避免在完成本快速入門後,讓系統向您的 GCP 帳戶收取不必要的費用,請按照下列步驟操作:
如果您是為了本程式碼研究室建立專案,也可以選擇刪除專案:
- 前往 GCP 主控台的「Projects」頁面。
- 在專案清單中選取要刪除的專案,然後按一下「刪除」。
- 在方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。
授權
這項內容採用的授權為 Creative Commons 姓名標示 3.0 通用授權和 Apache 2.0 授權。