Dataproc Serverless

1. 總覽 - Google Dataproc

Dataproc 是具備高擴充性的全代管服務,能執行 Apache Spark、Apache Flink、Presto 及其他許多開放原始碼工具和架構,使用 Dataproc 翻新資料湖泊、ETL / ELT,以及安全數據資料學服務,無論是全球規模的資料都適用。Dataproc 也與多項 Google Cloud 服務完全整合,包括 BigQueryCloud StorageVertex AIDataplex

Dataproc 提供三種變種版本:

  • Dataproc Serverless 可讓您在不設定基礎架構和自動調度資源的情況下執行 PySpark 工作。Dataproc Serverless 支援 PySpark 批次工作負載和工作階段 / 筆記本。
  • Google Compute Engine 上的 Dataproc 除了 Flink 和 Presto 等開放原始碼工具外,您還可以管理 Hadoop YARN 叢集來處理 YARN 型 Spark 工作負載。您可以視需求自訂雲端式叢集,視需求增加垂直或水平資源調度,包括自動調度資源。
  • Google Kubernetes Engine 上的 Dataproc 可讓您在 GKE 基礎架構中設定 Dataproc 虛擬叢集,以便提交 Spark、PySpark、SparkR 或 Spark SQL 工作。

在本程式碼研究室中,您將瞭解多種使用 Dataproc Serverless 的方法。

Apache Spark 最初是用來在 Hadoop 叢集上執行,並採用 YARN 做為資源管理員。維護 Hadoop 叢集需要一組特定的專業知識,並確保叢集上許多不同的控制點已正確設定。除了一組獨立的安裝指令,Spark 也會要求使用者設定。這在許多情況下,開發人員會花更多時間設定基礎架構,而不是處理 Spark 程式碼本身。

有了 Dataproc Serverless,您就不需要手動設定 Hadoop 叢集或 Spark。Dataproc Serverless 無法在 Hadoop 上執行,會使用自己的動態資源分配來確定資源需求,包括自動調度資源。部分 Spark 屬性仍可透過 Dataproc Serverless 自訂,不過在大部分執行個體中,您都不需要調整這些屬性。

2. 設定

請先設定本程式碼研究室中使用的環境和資源。

建立 Google Cloud 專案。您可以使用現有帳戶。

Cloud 控制台工具列中點選 Cloud Shell,即可開啟 Cloud Shell。

ba0bb17945a73543.png

Cloud Shell 提供現成可用的 Shell 環境,方便您在這個程式碼研究室中使用。

68c4ebd2a8539764.png

根據預設,Cloud Shell 會設定您的專案名稱。執行 echo $GOOGLE_CLOUD_PROJECT 再次確認。如果您在輸出內容中沒有看到專案 ID,請設定該 ID。

export GOOGLE_CLOUD_PROJECT=<your-project-id>

為資源設定 Compute Engine 區域,例如 us-central1europe-west2

export REGION=<your-region>

啟用 API

程式碼研究室會使用下列 API:

  • BigQuery
  • Dataproc

啟用必要的 API。大約需要一分鐘,完成後會看到成功訊息。

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

設定網路存取權

如要使用 Dataproc Serverless,您必須在執行 Spark 工作的區域啟用 Google 私人存取權,因為 Spark 驅動程式和執行工具僅具有私人 IP。執行下列指令,在 default 子網路中啟用。

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

您可以透過以下指令確認 Google 私人存取權是否已啟用,輸出的內容會顯示 TrueFalse

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

建立儲存空間值區

建立儲存空間值區,以便儲存在這個程式碼研究室中建立的資產。

選擇值區名稱。值區名稱在全域範圍內都不重複

export BUCKET=<your-bucket-name>

在要執行 Spark 工作的區域中建立值區。

gsutil mb -l ${REGION} gs://${BUCKET}

您可以在 Cloud Storage 控制台中看到自己的值區。您也可以執行 gsutil ls 來查看值區。

建立永久記錄伺服器

Spark UI 提供一組豐富的偵錯工具和 Spark 工作洞察資訊。如要查看已完成 Dataproc Serverless 工作的 Spark UI,您必須建立單一節點 Dataproc 叢集來做為永久記錄伺服器使用。

設定永久記錄伺服器的名稱。

PHS_CLUSTER_NAME=my-phs

執行下列指令。

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

本程式碼研究室後續章節會詳細介紹 Spark UI 和永久記錄伺服器。

3. 使用 Dataproc 批次執行無伺服器 Spark 工作

在這個範例中,您要使用 New York City (NYC) Citicycling Trips 公開資料集的一組資料。NYC Citicyclings 是紐約市內收費用的付費單車分享系統。您將執行簡單的轉換,並顯示前十大熱門 Citi 自行車站 ID 的 ID。此範例也使用開放原始碼 spark-bigquery-connector 在 Spark 與 BigQuery 之間流暢地讀取和寫入資料。

將下列 GitHub 存放區和 cd 複製到包含 citibike.py 檔案的目錄。

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

使用 Cloud Shell 預設提供的 Cloud SDK,將工作提交至無伺服器 Spark。在殼層中執行下列指令,並使用 Cloud SDK 和 Dataproc Batches API 提交無伺服器 Spark 工作。

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

以下將詳細說明:

  • gcloud dataproc batches submit 會參照 Dataproc Batches API
  • pyspark 表示您正在提交 PySpark 工作。
  • --batch 是工作的名稱。如未提供,系統會使用隨機產生的 UUID。
  • --region=${REGION} 是工作要處理的地理區域。
  • 在 Serverless 環境中執行前,--deps-bucket=${BUCKET} 是本機 Python 檔案上傳的位置。
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar 包含 Spark 執行階段環境中 spark-bigquery-connector 的 jar。
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} 是永久記錄伺服器的完整名稱。這就是 Spark 事件資料 (與控制台輸出內容分開的) 儲存位置,方便您透過 Spark UI 查看。
  • 結尾的 -- 表示超出此範圍的任何內容都會是程式的執行階段引數。在這個案例中,您可以依據工作要求提交值區名稱。

批次提交後,您會看到下列輸出內容。

Batch [citibike-job] submitted.

幾分鐘後,您會看到以下輸出內容,以及工作的中繼資料。

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

下一節將說明如何找出這項工作的記錄檔。

其他功能

有了 Spark Serverless,您還有其他執行工作的選項。

  • 您可以建立執行工作的自訂 Docker 映像檔。這是加入其他依附元件 (包括 Python 和 R 程式庫) 的絕佳方式。
  • 您可以將 Dataproc Metastore 執行個體連結至工作,以便存取 Hive 中繼資料。
  • 如需進一步控管,Dataproc Serverless 支援少數 Spark 屬性設定。

4. Dataproc 指標和觀測能力

Dataproc 批次控制台會列出您所有的 Dataproc Serverless 工作。控制台中會顯示每項工作的「批次 ID」、「位置」、「狀態」、「建立時間」、「經過時間」及「類型」。按一下工作的「批次 ID」,即可查看工作詳細資訊。

這個頁面中會顯示「Monitoring」(監控) 等資訊,其中顯示工作在一段時間內使用了多少 Batch Spark 執行者 (表示該工作自動調度資源的數量)。

在「詳細資料」分頁中,您會看到更多與工作相關的中繼資料,包括提交的所有引數和參數。

您也可以從這個頁面存取所有記錄。Dataproc Serverless 工作執行時,會產生三種不同的記錄檔:

  • 服務層級
  • 控制台輸出
  • Spark 事件記錄功能

服務層級:包含 Dataproc Serverless 服務產生的記錄檔。包括 Dataproc Serverless 要求自動調度資源的額外 CPU。如要查看上述記錄,請按一下「查看記錄檔」,開啟 Cloud Logging

您可以在「Output」(輸出) 下方查看主控台輸出內容。這是工作產生的輸出內容,包括 Spark 啟動時列印的中繼資料,或是納入工作的任何輸出陳述式。

您可以透過 Spark UI 存取 Spark 事件記錄功能。由於您使用永久記錄伺服器提供 Spark 工作,因此可以按一下「查看 Spark History Server」(查看 Spark 記錄伺服器) 來存取 Spark UI,因為其中含有先前執行 Spark 工作的資訊。如要進一步瞭解 Spark UI,請參閱官方的 Spark 說明文件

5. Dataproc 範本:BQ ->GCS

Dataproc 範本是開放原始碼工具,可協助進一步簡化雲端內的資料處理工作。這些 API 可做為 Dataproc Serverless 包裝函式,並提供適用於許多資料匯入和匯出工作的範本,包括:

  • BigQuerytoGCSGCStoBigQuery
  • GCStoBigTable
  • GCStoJDBCJDBCtoGCS
  • HivetoBigQuery
  • MongotoGCSGCStoMongo

如需完整清單,請參閱 README

在本節中,您將使用 Dataproc 範本,將資料從 BigQuery 匯出至 GCS

建立存放區的本機複本

複製存放區的本機複本,並變更為 python 資料夾。

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

設定環境

現在請設定環境變數。Dataproc 範本會使用環境變數 GCP_PROJECT 做為專案 ID,因此請將此設為 GOOGLE_CLOUD_PROJECT.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

您的區域應在更早之前於環境中設定。如果沒有,請在此進行設定。

export REGION=<region>

Dataproc 範本使用 spark-bigquery-conector 處理 BigQuery 工作,並要求在環境變數 JARS 中納入 URI。設定 JARS 變數。

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

設定範本參數

設定要使用服務的暫存值區名稱。

export GCS_STAGING_LOCATION=gs://${BUCKET}

接下來,您將設定一些工作專屬的變數。至於輸入資料表,您會再次參照 BigQuery NYC Citibike 資料集。

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

你可以選擇 csvparquetavrojson。在本程式碼研究室中,請選擇 CSV - 在下一個部分中,瞭解如何使用 Dataproc 範本轉換檔案類型。

BIGQUERY_GCS_OUTPUT_FORMAT=csv

將輸出模式設為 overwrite。可用選項有 overwriteappendignoreerrorifexists.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

將 GCS 輸出位置設為值區中的路徑。

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

執行範本

在下方指定 BIGQUERYTOGCS 範本並提供您設定的輸入參數,即可執行該範本。

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

輸出內容相當雜訊,但大約一分鐘後,您會看到以下結果。

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

您可以執行下列指令,確認檔案是否確實產生。

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

根據預設,Spark 會寫入多個檔案,視資料量而定。在這個案例中,您會看到大約 30 個產生的檔案。Spark 輸出檔案名稱的格式為 part - 後面加上 5 位數字 (表示零件編號) 和雜湊字串。如果是大量資料,Spark 通常會寫入數個檔案。範例檔案名稱為 part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv

6. Dataproc 範本:CSV 用於 parquet

您現在可以透過 Dataproc 範本,透過 GCSTOGCS 將 GCS 中的資料類型轉換為其他檔案類型。這個範本使用 SparkSQL,並提供選項來提交要在轉換期間處理的 SparkSQL 查詢,以便進行其他處理。

確認環境變數

確認 GCP_PROJECTREGIONGCS_STAGING_BUCKET 來自上一節的設定。

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

設定範本參數

您將為 GCStoGCS 設定設定參數。從輸入檔案的位置開始。請注意,這是目錄,而非特定檔案,因為系統會處理目錄中的所有檔案。設為 BIGQUERY_GCS_OUTPUT_LOCATION

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

設定輸入檔案的格式。

GCS_TO_GCS_INPUT_FORMAT=csv

設定所需的輸出格式。您可以選擇 parquet、json、avro 或 csv。

GCS_TO_GCS_OUTPUT_FORMAT=parquet

將輸出模式設為 overwrite。可用選項有 overwriteappendignoreerrorifexists.

GCS_TO_GCS_OUTPUT_MODE=overwrite

設定輸出位置。

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

執行範本

執行 GCStoGCS 範本。

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

輸出內容相當雜訊,但大約一分鐘後,您應該會看到以下成功訊息。

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

您可以執行下列指令,確認檔案是否確實產生。

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

透過這個範本,您也可以選擇將 gcs.to.gcs.temp.view.namegcs.to.gcs.sql.query 傳遞至範本,以便提供 SparkSQL 查詢,以便在寫入 GCS 之前對資料執行 SparkSQL 查詢。

7. 清除資源

完成本程式碼研究室之後,如要避免系統向您的 GCP 帳戶收取不必要的費用,請按照下列步驟操作:

  1. 針對您建立的環境刪除 Cloud Storage 值區
gsutil rm -r gs://${BUCKET}
  1. 刪除永久記錄伺服器使用的 Dataproc 叢集
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. 刪除 Dataproc Serverless 工作。前往批次控制台,在要刪除的工作旁按一下方塊,然後點選「刪除」

如果您只針對本程式碼研究室建立專案,也可以選擇刪除專案:

  1. 在 GCP Console 中,前往「Projects」(專案) 頁面。
  2. 在專案清單中,選取要刪除的專案,然後按一下 [刪除]。
  3. 在方塊中輸入專案 ID,然後按一下「關閉」以刪除專案。

8. 後續步驟

下列資源提供更多運用無伺服器 Spark 的方法: