1. 簡介 - Google Dataproc
Dataproc 是具備高擴充性的全代管服務,能執行 Apache Spark、Apache Flink、Presto 及其他許多開放原始碼工具和架構,使用 Dataproc 翻新資料湖泊、ETL / ELT,以及安全數據資料學服務,無論是全球規模的資料都適用。Dataproc 也與多項 Google Cloud 服務完全整合,包括 BigQuery、Cloud Storage、Vertex AI 和 Dataplex。
Dataproc 提供三種變種版本:
- Dataproc Serverless 可讓您在不設定基礎架構和自動調度資源的情況下執行 PySpark 工作。Dataproc Serverless 支援 PySpark 批次工作負載和工作階段 / 筆記本。
- Google Compute Engine 上的 Dataproc 除了 Flink 和 Presto 等開放原始碼工具外,您還可以管理 Hadoop YARN 叢集來處理 YARN 型 Spark 工作負載。您可以視需求自訂雲端式叢集,視需求增加垂直或水平資源調度,包括自動調度資源。
- Google Kubernetes Engine 上的 Dataproc 可讓您在 GKE 基礎架構中設定 Dataproc 虛擬叢集,以便提交 Spark、PySpark、SparkR 或 Spark SQL 工作。
2. 在 Google Cloud 虛擬私有雲中建立 Dataproc 叢集
在這個步驟中,您將使用 Google Cloud 控制台在 Google Cloud 上建立 Dataproc 叢集。
首先,請在控制台中啟用 Dataproc 服務 API。啟用後,搜尋「Dataproc」按一下搜尋列中的「建立叢集」。
選取「Compute Engine 叢集」,以 Google Compute Engine(GCE) VM 做為執行 Dataproc 叢集的基礎架構。
您現在已進入「叢集建立」頁面。
這個頁面上的內容:
- 提供不重複的叢集名稱。
- 選取特定地區。您也可以選取可用區,Dataproc 提供自動選擇可用區的功能。在本程式碼研究室中,請選取「us-central1」和「us-central1-c」
- 選取「標準」叢集類型這可確保只有一個主要節點。
- 在「設定節點」分頁中,確認建立的工作站數量會是兩個。
- 在「自訂叢集」專區中,勾選「啟用元件閘道」旁的方塊。讓您存取叢集中的網路介面,包括 Spark UI、Yarn Node Manager 和 Jupyter 筆記本。
- 在「選用元件」中,選取「Jupyter Notebook」。系統會使用 Jupyter 筆記本伺服器設定叢集。
- 其他設定維持不變,點選「建立叢集」。
這麼做會啟動 Dataproc 叢集。
3. 啟動叢集並透過 SSH 連線至叢集
當叢集狀態變更為「執行中」後,請在 Dataproc 控制台中按一下叢集名稱。
按一下「VM Instance」分頁標籤,查看叢集的主節點和兩個工作站節點。
按一下主要節點旁邊的「SSH」,登入主要節點。
執行 hdfs 指令來查看目錄結構。
hadoop_commands_example
sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51
sudo hadoop fs -ls /
4. 網路介面和元件閘道
在 Dataproc 叢集控制台中按一下叢集名稱,然後點選「網站介面」分頁標籤。
這裡會顯示可用的網路介面,包括 Jupyter。按一下「Jupyter」開啟 Jupyter 筆記本。您可以使用這個選項,在儲存在 GCS 的 PySpark 中建立筆記本。,在 Google Cloud Storage 中儲存筆記本,並開啟 PySpark 筆記本,以用於本程式碼研究室。
5. 監控與觀測 Spark 工作
啟動並執行 Dataproc 叢集後,請建立 PySpark 批次工作,並將工作提交到 Dataproc 叢集。
建立 Google Cloud Storage (GCS) 值區以儲存 PySpark 指令碼。請務必在與 Dataproc 叢集相同的區域建立值區。
現在 GCS 值區已建立完成,請將下列檔案複製到這個值區。
https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py
這個指令碼會建立範例 Spark DataFrame,並以 Hive 資料表的形式寫入。
hive_job.py
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()
df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")
在 Dataproc 中將這個指令碼提交為 Spark 批次工作。按一下左側導覽選單中的「工作」,然後點選「提交工作」。
提供「工作 ID」和「地區」。選取叢集,並提供所複製 Spark 指令碼的 GCS 位置。這項工作會在 Dataproc 中做為 Spark 批次工作執行。
在「Properties」(屬性) 底下,新增 spark.submit.deployMode
鍵和 client
值,確保驅動程式是在 Dataproc 主要節點 (而非工作站節點) 中執行。按一下「Submit」,將批次工作提交至 Dataproc。
Spark 指令碼會建立 DataFrame,並寫入 Hive 資料表 test_table_1
。
工作成功執行後,即可在「Monitoring」分頁下查看主控台的輸出陳述式。
Hive 資料表已建立完成,請提交另一個 Hive 查詢工作來選取資料表內容,並在控制台中顯示。
建立具有下列屬性的另一項工作:
請注意,「Job Type」(工作類型) 已設為「Hive」,查詢來源類型為「Query Text」(查詢文字),這表示我們會在「Query Text」(查詢文字) 文字方塊中寫入整個 HiveQL 陳述式。
提交工作,並保留其餘參數做為預設值。
請注意 HiveQL 如何選取所有記錄和顯示主控台。
6. 自動調度資源
「自動調度資源」是估算「右側」的工作工作負載的叢集工作站節點數量
Dataproc 自動調度資源政策 API 提供自動管理叢集資源機制,並能自動調度叢集工作站 VM。自動調度資源政策是可重複使用的設定,當中說明採用自動調度資源政策的叢集工作站應如何調度資源。並定義資源調度界限、頻率和積極程度,讓您可精細控管整個叢集生命週期中的叢集資源。
Dataproc 自動調度資源政策是以 YAML 檔案編寫。這些 YAML 檔案會透過 CLI 指令傳送來建立叢集,也可能是在透過 Cloud 控制台建立叢集時,從 GCS 值區選取。
以下是 Dataproc 自動調度資源政策的範例:
policy.yaml
workerConfig:
minInstances: 10
maxInstances: 10
secondaryWorkerConfig:
maxInstances: 50
basicAlgorithm:
cooldownPeriod: 4m
yarnConfig:
scaleUpFactor: 0.05
scaleDownFactor: 1.0
gracefulDecommissionTimeout: 1h
7. 設定 Dataproc 選用元件
這麼做會啟動 Dataproc 叢集。
建立 Dataproc 叢集時,系統會自動在叢集上安裝標準的 Apache Hadoop 生態系統元件 (請參閱 Dataproc 版本清單)。建立叢集時,您可以在叢集中安裝其他元件,這些元件稱為選用元件。
透過控制台建立 Dataproc 叢集時,我們啟用了選用元件,並選取「Jupyter Notebook」做為選用元件。
8. 清除資源
如要清除叢集,請在 Dataproc 控制台中選取叢集後,點選「Stop」。叢集停止後,點選「刪除」即可刪除叢集。
刪除 Dataproc 叢集後,請刪除已複製程式碼的 GCS 值區。
如要清除資源並停止任何不必要的計費,您必須先停止 Dataproc 叢集,然後才能將其刪除。
停止及刪除叢集之前,請確保寫入 HDFS 儲存空間的所有資料都已複製到 GCS 以用於持久儲存空間。
如要停止叢集,請按一下「Stop」。
叢集停止後,點選「刪除」即可刪除叢集。
在確認對話方塊中,按一下「刪除」來刪除叢集。