Dataproc Serverless

1. 概览 - Google Dataproc

Dataproc 是一项扩缩能力极强的全托管式服务,可用于运行 Apache Spark、Apache Flink、Presto 以及许多其他开源工具和框架。使用 Dataproc 在全球范围内实现数据湖现代化改造、ETL / ELT 以及保障数据科学安全。Dataproc 还完全集成了多项 Google Cloud 服务,包括 BigQueryCloud StorageVertex AIDataplex

Dataproc 提供三种版本:

  • 借助 Dataproc Serverless,您无需配置基础架构和自动扩缩即可运行 PySpark 作业。Dataproc Serverless 支持 PySpark 批处理工作负载和会话 / 笔记本。
  • 借助 Google Compute Engine 上的 Dataproc,除了 Flink 和 Presto 等开源工具之外,您还可以为基于 YARN 的 Spark 工作负载管理 Hadoop YARN 集群。您可以定制自己的云端集群,根据需要进行任意数量的纵向或横向扩缩(包括自动扩缩)。
  • Google Kubernetes Engine 上的 Dataproc 允许您在 GKE 基础架构中配置 Dataproc 虚拟集群,以提交 Spark、PySpark、SparkR 或 Spark SQL 作业。

在此 Codelab 中,您将了解使用 Dataproc Serverless 的几种不同方式。

Apache Spark 最初设计为在 Hadoop 集群上运行,并使用 YARN 作为其资源管理器。维护 Hadoop 集群需要一组特定的专业知识,并确保集群上的许多不同旋钮都得到正确配置。这是对 Spark 同样要求用户设置的一组单独旋钮的补充。这导致了许多情况下,开发者需要花费更多时间来配置基础架构,而不是处理 Spark 代码本身。

借助 Dataproc Serverless,您无需手动配置 Hadoop 集群或 Spark。Dataproc Serverless 不会在 Hadoop 上运行,它使用自己的动态资源分配来确定其资源需求,包括自动扩缩。一小部分 Spark 属性仍然可以使用 Dataproc Serverless 进行自定义,但在大多数情况下,您无需调整这些属性。

2. 设置

首先,您需要配置此 Codelab 中使用的环境和资源。

创建 Google Cloud 项目。您可以使用现有的代码。

Cloud 控制台工具栏中点击 Cloud Shell 将其打开。

ba0bb17945a73543.png

Cloud Shell 提供了一个可用于此 Codelab 的现成 Shell 环境。

68c4ebd2a8539764

默认情况下,Cloud Shell 将设置您的项目名称。请运行 echo $GOOGLE_CLOUD_PROJECT 仔细检查。如果输出中未显示您的项目 ID,请进行设置。

export GOOGLE_CLOUD_PROJECT=<your-project-id>

为您的资源设置 Compute Engine 区域,例如 us-central1europe-west2

export REGION=<your-region>

启用 API

此 Codelab 使用了以下 API:

  • BigQuery
  • Dataproc

启用必要的 API。此过程大约需要一分钟时间,完成后会显示一条成功消息。

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

配置网络访问权限

Dataproc Serverless 要求在将运行 Spark 作业的区域启用 Google 专用访问通道,因为 Spark 驱动程序和执行程序只有专用 IP。运行以下命令,以在 default 子网中启用它。

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

您可以通过以下方式验证是否已启用 Google 专用访问通道,以下代码会输出 TrueFalse

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

创建存储桶

创建一个存储分区,用于存储在此 Codelab 中创建的资源。

为您的存储分区选择一个名称。存储分区名称在所有用户中必须是全局唯一的

export BUCKET=<your-bucket-name>

在要运行 Spark 作业的区域中创建存储分区。

gsutil mb -l ${REGION} gs://${BUCKET}

您可以看到 Cloud Storage 控制台中提供了您的存储分区。您还可以运行 gsutil ls 来查看您的存储分区。

创建永久性历史记录服务器

Spark 界面提供一组丰富的调试工具和有关 Spark 作业的数据分析。如需查看已完成的 Dataproc Serverless 作业的 Spark 界面,您必须创建单节点 Dataproc 集群以用作永久性历史记录服务器

为您的永久性历史记录服务器设置一个名称。

PHS_CLUSTER_NAME=my-phs

请运行以下命令。

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

此 Codelab 稍后会详细介绍 Spark 界面和永久性历史记录服务器。

3. 使用 Dataproc 批次运行无服务器 Spark 作业

在此示例中,您将使用纽约市 (NYC) Citi Bike Trips 公共数据集中的一组数据。NYC Citi Bikes 是纽约市的付费共享单车系统。您将执行一些简单的转换,并输出十大热门的花旗单车站点 ID。此外,该示例还特别使用开源 spark-bigquery-connector 在 Spark 和 BigQuery 之间无缝读取和写入数据。

将以下 GitHub 代码库和 cd 克隆到包含文件 citibike.py 的目录中。

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

使用 Cloud SDK 中默认提供的 Cloud SDK 将作业提交到无服务器 Spark。在 shell 中运行以下命令,该 shell 利用 Cloud SDK 和 Dataproc Batches API 提交无服务器 Spark 作业。

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

其详细解释如下:

  • gcloud dataproc batches submit 引用 Dataproc Batches API
  • pyspark 表示您正在提交 PySpark 作业。
  • --batch 是作业的名称。如果未提供,将使用随机生成的 UUID。
  • --region=${REGION} 是处理作业的地理区域。
  • 在无服务器环境中运行前,您的本地 Python 文件将上传到 --deps-bucket=${BUCKET}
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar 包含 Spark 运行时环境中 spark-bigquery-connector 的 jar。
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} 是永久性历史记录服务器的完全限定名称。这是 Spark 事件数据(独立于控制台输出)的存储和从 Spark 界面进行查看的位置。
  • 尾随的 -- 表示超出此值的任何内容都将成为程序的运行时参数。在这种情况下,您需要按照作业的要求提交存储分区的名称。

提交批量工作负载后,您会看到以下输出内容。

Batch [citibike-job] submitted.

几分钟后,您将看到以下输出以及该作业的元数据。

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

在下一部分中,您将了解如何查找此作业的日志。

其他功能

借助 Spark Serverless,您可以通过其他方式运行作业。

  • 您可以创建用于运行作业的自定义 Docker 映像。这是添加其他依赖项(包括 Python 和 R 库)的好方法。
  • 您可以将 Dataproc Metastore 实例连接到作业以访问 Hive 元数据。
  • 为了实现额外的控制,Dataproc Serverless 支持对一小部分 Spark 属性进行配置。

4. Dataproc 指标和可观测性

Dataproc 批处理控制台列出了您的所有 Dataproc Serverless 作业。在控制台中,您将看到每个作业的批次 ID、位置、状态创建时间、所用时间类型。点击作业的批次 ID 以查看相关详细信息。

在此页面上,您会看到监控等信息,其中会显示您的作业在一段时间内使用的 Batch Spark 执行器数量(指示其自动扩缩的程度)。

详细信息标签页上,您将看到有关作业的更多元数据,包括随作业一起提交的所有参数。

您还可以通过此页面访问所有日志。运行 Dataproc 无服务器作业时,会生成三组不同的日志:

  • 服务层面
  • 控制台输出
  • Spark 事件日志记录

服务级包括 Dataproc Serverless 服务生成的日志。其中包括 Dataproc Serverless 请求额外的 CPU 以实现自动扩缩等。点击查看日志即可打开 Cloud Logging,以查看这些日志。

您可以在输出下查看控制台输出。这是作业生成的输出,包括 Spark 在开始作业时输出的元数据或包含在作业中的任何输出语句。

可以从 Spark 界面访问 Spark 事件日志记录。由于您为 Spark 作业提供了永久性历史记录服务器,因此您可以点击查看 Spark 历史记录服务器来访问 Spark 界面,其中包含之前运行的 Spark 作业的信息。如需详细了解 Spark 界面,请参阅官方 Spark 文档

5. Dataproc 模板:BQ ->GCS

Dataproc 模板是一种开源工具,有助于进一步简化云端数据处理任务。这些容器可用作 Dataproc Serverless 的封装容器,并包含用于许多数据导入和导出任务的模板,包括:

  • BigQuerytoGCSGCStoBigQuery
  • GCStoBigTable
  • GCStoJDBCJDBCtoGCS
  • HivetoBigQuery
  • MongotoGCSGCStoMongo

有关完整列表,请参见自述文件

在本部分中,您将使用 Dataproc 模板将数据从 BigQuery 导出到 GCS

克隆代码库

克隆代码库并切换到 python 文件夹。

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

配置环境

您现在将设置环境变量。Dataproc 模板使用环境变量 GCP_PROJECT 作为您的项目 ID,因此请将此变量设置为 GOOGLE_CLOUD_PROJECT.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

您的区域应在之前的环境中设置。如果没有,请在此处进行设置。

export REGION=<region>

Dataproc 模板使用 spark-bigquery-conector 处理 BigQuery 作业,并要求将 URI 包含在环境变量 JARS 中。设置 JARS 变量。

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

配置模板参数

设置要使用的服务使用的暂存存储分区的名称。

export GCS_STAGING_LOCATION=gs://${BUCKET}

接下来,您将设置一些特定于作业的变量。对于输入表,您将再次引用 BigQuery NYC Citibike 数据集。

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

您可以选择 csvparquetavrojson。对于此 Codelab,请选择 CSV - 在下一部分中,如何使用 Dataproc 模板转换文件类型。

BIGQUERY_GCS_OUTPUT_FORMAT=csv

将输出模式设置为 overwrite。您可以选择 overwriteappendignoreerrorifexists.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

将 GCS 输出位置设置为存储分区中的路径。

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

运行模板

运行 BIGQUERYTOGCS 模板,方法是在下面指定该模板并提供您设置的输入参数。

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

输出会有些嘈杂,但大约一分钟后,您会看到以下内容。

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

您可以通过运行以下命令来验证文件是否已生成。

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

默认情况下,Spark 会根据数据量写入多个文件。在这种情况下,您会看到大约 30 个生成的文件。Spark 输出文件名采用 part 格式,后跟一个五位数(表示部件号)和一个哈希字符串。对于大量数据,Spark 通常会写出多个文件。示例文件名为 part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv

6. Dataproc 模板:将 CSV 转换为 Parquet

现在,您将使用 Dataproc 模板,通过 GCSTOGCS 将 GCS 中的数据从一种文件类型转换为另一种文件类型。此模板使用 SparkSQL,并提供一个选项,以便同时提交要在转换期间处理的 SparkSQL 查询,以进行其他处理。

确认环境变量

确认在上一部分中设置了 GCP_PROJECTREGIONGCS_STAGING_BUCKET

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

设置模板参数

现在,您将设置 GCStoGCS 的配置参数。从输入文件的位置开始。请注意,这是一个目录而非特定文件,因为系统会处理该目录中的所有文件。将此项设为 BIGQUERY_GCS_OUTPUT_LOCATION

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

设置输入文件的格式。

GCS_TO_GCS_INPUT_FORMAT=csv

设置所需的输出格式。您可以选择 parquet、json、avro 或 csv。

GCS_TO_GCS_OUTPUT_FORMAT=parquet

将输出模式设置为 overwrite。您可以选择 overwriteappendignoreerrorifexists.

GCS_TO_GCS_OUTPUT_MODE=overwrite

设置输出位置。

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

运行模板

运行 GCStoGCS 模板。

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

输出将非常嘈杂,但大约一分钟后,您应该会看到如下所示的成功消息。

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

您可以通过运行以下命令来验证文件是否已生成。

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

使用此模板,您还可以选择提供 SparkSQL 查询,方法是将 gcs.to.gcs.temp.view.namegcs.to.gcs.sql.query 传递给模板,从而在将数据写入 GCS 之前对数据运行 SparkSQL 查询。

7. 清理资源

为避免在完成此 Codelab 后向您的 GCP 账号产生不必要的费用,请执行以下操作:

  1. 删除您创建的环境对应的 Cloud Storage 存储分区
gsutil rm -r gs://${BUCKET}
  1. 删除用于永久性历史记录服务器的 Dataproc 集群
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. 删除 Dataproc 无服务器作业。转到批处理控制台,点击要删除的每个作业旁边的复选框,然后点击删除

如果您专门为此 Codelab 创建了一个项目,也可以选择删除该项目:

  1. 在 GCP Console 中,转到项目页面。
  2. 在项目列表中,选择要删除的项目,然后点击“删除”。
  3. 在框中输入项目 ID,然后点击“关停”以删除项目。

8. 后续操作

以下资源提供了利用 Serverless Spark 的其他方式: