1. 總覽
本程式碼研究室將逐步說明如何在 Google Cloud Platform 上,使用 Apache Spark 與 Dataproc 建立資料處理管道。在數據資料學和資料工程領域中,經常需要從某個儲存位置讀取資料、在儲存位置執行轉換,以及將資料寫入另一個儲存空間位置。常見的轉換包括變更資料內容、去除不必要的資訊,以及變更檔案類型。
在本程式碼研究室中,您將瞭解 Apache Spark、使用 Dataproc 搭配 PySpark (Apache Spark 的 Python API)、BigQuery、Google Cloud Storage 以及 Reddit 的資料執行範例管道。
2. Apache Spark 簡介 (選用)
根據網站的內容:「Apache Spark 是用於處理大規模資料的整合式數據分析引擎。這項服務可讓您在記憶體內並行分析及處理資料,以便跨多個不同的機器和節點進行大規模平行運算。它最初是在 2014 年升級為傳統 MapReduce 時發行,至今仍是執行大規模運算最常見的架構之一。Apache Spark 是以 Scala 編寫,隨後又有 Scala、Java、Python 和 R 的 API。其中包含多種程式庫,例如用於對資料執行 SQL 查詢的 Spark SQL、用於串流資料的 Spark Streaming、適用於機器學習的 MLlib,以及用於圖形處理的 GraphX (這些程式庫都是在 Apache Spark 引擎上執行)。
Spark 可自行執行,也可以使用 Yarn、Mesos 或 Kubernetes 等資源管理服務進行資源調度。您將在本程式碼研究室中使用 Dataproc,這個程式碼研究室運用了 Yarn。
Spark 中的資料一開始會載入到記憶體中,稱為 RDD 或具彈性的分散式資料集。在 Spark 上進行開發後,新增了兩種新的資料欄式類型資料類型:資料集 (類型) 和 DataFrame (無類型) 的 DataFrame。大致上來說,彈性分散式資料集 (RDD) 適用於任何類型的資料,而資料集和 DataFrames 已針對表格資料進行最佳化。由於資料集僅適用於 Java 和 Scala API,因此我們將繼續在本程式碼研究室中使用 PySpark DataFrame API。詳情請參閱 Apache Spark 說明文件。
3. 用途
資料工程師通常需要將資料方便數據資料學家輕鬆存取,不過,資料最初通常髒亂,且難以在目前的狀態下用於數據分析,且需要先清理才能發揮大量效用。例如從網路抓取的資料可能包含奇怪的編碼或多餘的 HTML 標記。
在本研究室中,您會將一組以 Reddit 格式發布的資料,載入 Dataproc 託管的 Spark 叢集、擷取實用資訊,並將處理後的資料以 CSV 壓縮檔的形式儲存在 Google Cloud Storage 中。
貴公司的數據資料學家希望團隊協助處理不同的自然語言處理問題。具體而言,他們想要分析「r/food」下的資料。從 2017 年 1 月至 2019 年 8 月的補充作業開始,您將建立資料轉儲管道。
4. 透過 BigQuery Storage API 使用 BigQuery
使用 tabledata.list API 方法從 BigQuery 提取資料可能會耗費大量時間,而且效率會隨著資料量的成長而提高。這個方法會傳回 JSON 物件清單,且需要一次讀取一頁才能讀取整個資料集。
BigQuery Storage API 使用以遠端程序呼叫 (RPC) 為基礎的通訊協定,可大幅改善存取 BigQuery 資料的方式。同時支援平行讀取和寫入資料,以及各種序列化格式,例如 Apache Avro 和 Apache Arrow。整體來說,這可大幅改善成效,特別是在資料集較大的情況下。
在本程式碼研究室中,您將使用 spark-bigquery-connector 在 BigQuery 和 Spark 之間讀取和寫入資料。
5. 建立專案
前往 console.cloud.google.com 登入 Google Cloud Platform 控制台,並建立新專案:
接下來,您需要在 Cloud 控制台中啟用計費功能,才能使用 Google Cloud 資源。
執行本程式碼研究室的成本不應超過數美元,但如果您決定使用更多資源或讓資源繼續運作,費用會增加。本程式碼研究室的最後一個部分會引導您清除專案。
新使用者符合 $300 美元免費試用資格的 Google Cloud Platform。
6. 設定環境
現在您可以按照以下方式完成環境設定:
- 啟用 Compute Engine、Dataproc 和 BigQuery Storage API
- 調整專案設定
- 建立 Dataproc 叢集
- 建立 Google Cloud Storage 值區
啟用 API 並設定環境
按下 Cloud 控制台右上角的按鈕,開啟 Cloud Shell。
Cloud Shell 載入後,執行下列指令,啟用 Compute Engine、Dataproc 和 BigQuery Storage API:
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
設定專案的專案 ID。請前往專案選取頁面搜尋專案,即可找到這組 ID。這可能與專案名稱不同。
執行下列指令以設定專案 ID:
gcloud config set project <project_id>
如要設定專案的區域,請從這裡的清單中選擇區域。例如 us-central1
。
gcloud config set dataproc/region <region>
為 Dataproc 叢集命名,並為叢集建立環境變數。
CLUSTER_NAME=<cluster_name>
正在建立 Dataproc 叢集
請執行下列指令來建立 Dataproc 叢集:
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
這個指令會在幾分鐘內執行完畢。如要細分指令:
這項操作會使用您先前提供的名稱建立 Dataproc 叢集。使用 beta
API 會啟用 Dataproc 的 Beta 版功能,例如元件閘道。
gcloud beta dataproc clusters create ${CLUSTER_NAME}
這會設定工作站使用的機器類型。
--worker-machine-type n1-standard-8
這會設定叢集的工作站數量。
--num-workers 8
這會設定 Dataproc 的映像檔版本。
--image-version 1.5-debian
這會設定要在叢集使用的初始化動作。您在此納入 pip 初始化動作。
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
這是要納入叢集的中繼資料。在此,您將提供 pip
初始化動作的中繼資料。
--metadata 'PIP_PACKAGES=google-cloud-storage'
這會設定要在叢集上安裝的選用元件。
--optional-components=ANACONDA
這項操作會啟用元件閘道,讓您使用 Dataproc 的元件閘道查看常用 UI,例如 Zeppelin、Jupyter 或 Spark 記錄
--enable-component-gateway
如要深入瞭解 Dataproc,請參閱這個程式碼研究室。
建立 Google Cloud Storage 值區
您必須為工作輸出建立 Google Cloud Storage 值區。為值區設定不重複的名稱,然後執行下列指令來建立新的值區。在所有 Google Cloud 專案中,所有使用者的值區名稱都不重複,因此您可能需要嘗試使用不同名稱數次。如果您沒有收到 ServiceException
,代表已成功建立值區。
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7. 探索性資料分析
執行預先處理作業之前,請先進一步瞭解所處理資料的性質。為此,你將瞭解兩種資料探索方法。首先,您將使用 BigQuery 網頁版 UI 查看一些原始資料,接著使用 PySpark 和 Dataproc 計算每個子 reddit 的文章數量。
使用 BigQuery 網頁版 UI
請先使用 BigQuery 網頁版 UI 查看資料。在 Cloud 控制台的選單圖示中,向下捲動並按下「BigQuery」開啟 BigQuery 網頁版 UI。
接著,在 BigQuery 網頁版 UI 查詢編輯器中執行下列指令。這樣會傳回 2017 年 1 月之後的 10 列完整資料:
select * from fh-bigquery.reddit_posts.2017_01 limit 10;
您可以捲動頁面,查看所有可用欄及一些範例。尤其是,您會看到兩個欄位,分別代表每篇文章的文字內容:「title」和「selftext」即為文章內文。請一併留意其他資料欄,如「created_utc」也就是發布文章的時間和「subreddit」該則貼文所屬的子來源。
執行 PySpark 工作
在 Cloud Shell 中執行下列指令,使用程式碼範例複製存放區,並將 cd 複製到正確的目錄:
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
您可以利用 PySpark 來判斷每個 subreddit 的貼文數。您可以開啟 Cloud 編輯器並讀取指令碼 cloud-dataproc/codelabs/spark-bigquery
,然後在下一個步驟中執行:
按一下 [開啟終端機]按鈕,返回 Cloud Shell 並執行下列指令,執行第一項 PySpark 工作:
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
這個指令可讓您透過 Jobs API 將工作提交至 Dataproc。在這裡,您要將工作類型指定為 pyspark
。您可以提供叢集名稱、選用參數 和包含工作的檔案名稱。您在此提供 --jars
參數,可讓您在工作中加入 spark-bigquery-connector
。您也可以使用 --driver-log-levels root=FATAL
設定記錄輸出層級,這會隱藏所有記錄輸出內容,但發生錯誤時除外。而 Spark 記錄檔往往較為雜訊。
這項作業應該會在幾分鐘內執行,最終輸出內容應如下所示:
8. 探索 Dataproc 和 Spark UI
在 Dataproc 上執行 Spark 工作時,您可以使用兩個 UI 檢查工作 / 叢集的狀態。第一種是 Dataproc UI,點選「選單」圖示並向下捲動至「Dataproc」即可找到這個 UI您可以在這裡查看目前的可用記憶體、待處理記憶體和工作站數量。
您也可以按一下「工作」分頁標籤,查看已完成的工作。按一下特定工作的工作 ID,即可查看工作記錄和輸出內容等工作詳細資料。
您也可以查看 Spark UI。在工作頁面中,按一下返回箭頭,然後點選「網路介面」。元件閘道下方應會顯示多個選項。許多方法可以在設定叢集時透過選用元件啟用。在這個研究室中,點選「Spark History Server」(Spark 記錄伺服器)
畫面上應該會開啟以下視窗:
這裡會顯示所有已完成的工作,只要按一下 application_id 即可進一步瞭解工作。同樣地,您可以按一下 [顯示未完成的應用程式]即可查看目前執行中的所有工作。
9. 執行補充工作
您現在可以執行工作,將資料載入記憶體、擷取必要資訊,並將輸出內容轉儲到 Google Cloud Storage 值區。您將擷取「標題」、「內文」(原始文字) 和「timestamp created」。接著,您需要將這項資料轉換為 CSV 檔案並進行壓縮,再將 URI 載入值區, URI 為 gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz。
您再次參照 Cloud 編輯器來讀取 cloud-dataproc/codelabs/spark-bigquery/backfill.sh
的程式碼,這是用於在 cloud-dataproc/codelabs/spark-bigquery/backfill.py
中執行程式碼的包裝函式指令碼。
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
您應該很快就會看到一系列的工作完成訊息。這項工作最多可能需要 15 分鐘才能完成。您也可以使用 gsutil 再次檢查儲存空間值區,確認資料輸出成功。所有工作都完成後,請執行下列指令:
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
您應該會看到以下的輸出內容:
恭喜,您已成功完成 Reddit 留言資料的補充作業!如要瞭解如何運用這些資料建立模型,請繼續前往 Spark-NLP 程式碼研究室。
10. 清除
完成本快速入門導覽課程後,如要避免系統向您的 GCP 帳戶收取不必要的費用,請按照下列步驟操作:
- 刪除環境和您建立的 Cloud Storage 值區
- 刪除 Dataproc 環境。
如果您只針對本程式碼研究室建立專案,也可以選擇刪除專案:
- 在 GCP 控制台中,前往「專案」頁面。
- 在專案清單中,選取要刪除的專案,然後按一下「Delete」(刪除)。
- 在方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。
授權
本作品採用創用 CC 姓名標示 3.0 通用授權,以及 Apache 2.0 授權。