1. 概览 - Google Dataproc
Dataproc 是一项扩缩能力极强的全代管式服务,可用于运行 Apache Spark、Apache Flink、Presto 以及许多其他开源工具和框架。使用 Dataproc 在全球范围内实现数据湖现代化改造、ETL / ELT 以及确保数据科学安全。Dataproc 还完全集成了多项 Google Cloud 服务,包括 BigQuery、Cloud Storage、Vertex AI 和 Dataplex。
Dataproc 提供三种版本:
- 借助 Dataproc Serverless,您无需配置基础架构和自动扩缩即可运行 PySpark 作业。Dataproc Serverless 支持 PySpark 批处理工作负载和会话 / 笔记本。
- 借助 Google Compute Engine 上的 Dataproc,除了 Flink 和 Presto 等开源工具之外,您还可以为基于 YARN 的 Spark 工作负载管理 Hadoop YARN 集群。您可以定制自己的云端集群,根据需要进行任意数量的纵向或横向扩缩(包括自动扩缩)。
- Google Kubernetes Engine 上的 Dataproc 允许您在 GKE 基础架构中配置 Dataproc 虚拟集群,以提交 Spark、PySpark、SparkR 或 Spark SQL 作业。
在此 Codelab 中,您将了解使用 Dataproc Serverless 的几种不同方式。
Apache Spark 最初设计为在 Hadoop 集群上运行,并使用 YARN 作为其资源管理器。维护 Hadoop 集群需要一组特定的专业知识,并确保集群上的许多不同旋钮都得到正确配置。这是对 Spark 同样要求用户设置的一组单独旋钮的补充。这导致了许多情况下,开发者需要花费更多时间来配置基础架构,而不是处理 Spark 代码本身。
借助 Dataproc Serverless,您无需手动配置 Hadoop 集群或 Spark。Dataproc Serverless 不会在 Hadoop 上运行,它使用自己的动态资源分配来确定其资源需求,包括自动扩缩。一小部分 Spark 属性仍然可以使用 Dataproc Serverless 进行自定义,但在大多数情况下,您无需调整这些属性。
2. 设置
首先,您需要配置此 Codelab 中使用的环境和资源。
创建 Google Cloud 项目。您可以使用现有的代码。
在 Cloud 控制台工具栏中点击 Cloud Shell 将其打开。
Cloud Shell 提供了一个可用于此 Codelab 的现成 Shell 环境。
默认情况下,Cloud Shell 将设置您的项目名称。请运行 echo $GOOGLE_CLOUD_PROJECT
仔细检查。如果输出中未显示您的项目 ID,请进行设置。
export GOOGLE_CLOUD_PROJECT=<your-project-id>
为您的资源设置 Compute Engine 区域,例如 us-central1
或 europe-west2
。
export REGION=<your-region>
启用 API
此 Codelab 使用了以下 API:
- BigQuery
- Dataproc
启用必要的 API。此过程大约需要一分钟时间,完成后会显示一条成功消息。
gcloud services enable bigquery.googleapis.com gcloud services enable dataproc.googleapis.com
配置网络访问权限
Dataproc Serverless 要求在将运行 Spark 作业的区域启用 Google 专用访问通道,因为 Spark 驱动程序和执行程序只有专用 IP。运行以下命令,以在 default
子网中启用它。
gcloud compute networks subnets update default \ --region=${REGION} \ --enable-private-ip-google-access
您可以通过以下方式验证是否已启用 Google 专用访问通道,以下代码会输出 True
或 False
。
gcloud compute networks subnets describe default \ --region=${REGION} \ --format="get(privateIpGoogleAccess)"
创建存储桶
创建一个存储分区,用于存储在此 Codelab 中创建的资源。
为您的存储分区选择一个名称。存储分区名称在所有用户中必须是全局唯一的。
export BUCKET=<your-bucket-name>
在要运行 Spark 作业的区域中创建存储分区。
gsutil mb -l ${REGION} gs://${BUCKET}
您可以看到 Cloud Storage 控制台中提供了您的存储分区。您还可以运行 gsutil ls
来查看您的存储分区。
创建永久性历史记录服务器
Spark 界面提供一组丰富的调试工具和有关 Spark 作业的数据分析。如需查看已完成的 Dataproc Serverless 作业的 Spark 界面,您必须创建单节点 Dataproc 集群以用作永久性历史记录服务器。
为您的永久性历史记录服务器设置一个名称。
PHS_CLUSTER_NAME=my-phs
请运行以下命令。
gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \ --region=${REGION} \ --single-node \ --enable-component-gateway \ --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history
此 Codelab 稍后会详细介绍 Spark 界面和永久性历史记录服务器。
3. 使用 Dataproc 批次运行无服务器 Spark 作业
在此示例中,您将使用纽约市 (NYC) Citi Bike Trips 公共数据集中的一组数据。NYC Citi Bikes 是纽约市的付费共享单车系统。您将执行一些简单的转换,并输出十大热门的花旗单车站点 ID。此外,该示例还特别使用开源 spark-bigquery-connector 在 Spark 和 BigQuery 之间无缝读取和写入数据。
将以下 GitHub 代码库和 cd
克隆到包含文件 citibike.py
的目录中。
git clone https://github.com/GoogleCloudPlatform/devrel-demos.git cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless
citibike.py
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
if len(sys.argv) == 1:
print("Please provide a GCS bucket name.")
bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"
spark = SparkSession.builder \
.appName("pyspark-example") \
.config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
.getOrCreate()
df = spark.read.format("bigquery").load(table)
top_ten = df.filter(col("start_station_id") \
.isNotNull()) \
.groupBy("start_station_id") \
.count() \
.orderBy("count", ascending=False) \
.limit(10) \
.cache()
top_ten.show()
top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")
使用 Cloud SDK 中默认提供的 Cloud SDK 将作业提交到无服务器 Spark。在 shell 中运行以下命令,该 shell 利用 Cloud SDK 和 Dataproc Batches API 提交无服务器 Spark 作业。
gcloud dataproc batches submit pyspark citibike.py \ --batch=citibike-job \ --region=${REGION} \ --deps-bucket=gs://${BUCKET} \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \ --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \ -- ${BUCKET}
其详细解释如下:
gcloud dataproc batches submit
引用 Dataproc Batches API。pyspark
表示您正在提交 PySpark 作业。--batch
是作业的名称。如果未提供,将使用随机生成的 UUID。--region=${REGION}
是处理作业的地理区域。- 在无服务器环境中运行前,您的本地 Python 文件将上传到
--deps-bucket=${BUCKET}
。 --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar
包含 Spark 运行时环境中 spark-bigquery-connector 的 jar。--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER}
是永久性历史记录服务器的完全限定名称。这是 Spark 事件数据(独立于控制台输出)的存储和从 Spark 界面进行查看的位置。- 尾随的
--
表示超出此值的任何内容都将成为程序的运行时参数。在这种情况下,您需要按照作业的要求提交存储分区的名称。
提交批量工作负载后,您会看到以下输出内容。
Batch [citibike-job] submitted.
几分钟后,您将看到以下输出以及该作业的元数据。
+----------------+------+ |start_station_id| count| +----------------+------+ | 519|551078| | 497|423334| | 435|403795| | 426|384116| | 293|372255| | 402|367194| | 285|344546| | 490|330378| | 151|318700| | 477|311403| +----------------+------+ Batch [citibike-job] finished.
在下一部分中,您将了解如何查找此作业的日志。
其他功能
借助 Spark Serverless,您可以通过其他方式运行作业。
- 您可以创建用于运行作业的自定义 Docker 映像。这是添加其他依赖项(包括 Python 和 R 库)的好方法。
- 您可以将 Dataproc Metastore 实例连接到作业以访问 Hive 元数据。
- 为了实现额外的控制,Dataproc Serverless 支持对一小部分 Spark 属性进行配置。
4. Dataproc 指标和可观测性
Dataproc 批处理控制台列出了您的所有 Dataproc Serverless 作业。在控制台中,您将看到每个作业的批次 ID、位置、状态、创建时间、所用时间和类型。点击作业的批次 ID 以查看相关详细信息。
在此页面上,您会看到监控等信息,其中会显示您的作业在一段时间内使用的 Batch Spark 执行器数量(指示其自动扩缩的程度)。
在详细信息标签页上,您将看到有关作业的更多元数据,包括随作业一起提交的所有参数。
您还可以通过此页面访问所有日志。运行 Dataproc 无服务器作业时,会生成三组不同的日志:
- 服务层面
- 控制台输出
- Spark 事件日志记录
服务级包括 Dataproc Serverless 服务生成的日志。其中包括 Dataproc Serverless 请求额外的 CPU 以实现自动扩缩等。点击查看日志即可打开 Cloud Logging,以查看这些日志。
您可以在输出下查看控制台输出。这是作业生成的输出,包括 Spark 在开始作业时输出的元数据或包含在作业中的任何打印语句。
可以从 Spark 界面访问 Spark 事件日志记录。由于您为 Spark 作业提供了永久性历史记录服务器,因此您可以点击查看 Spark 历史记录服务器来访问 Spark 界面,其中包含之前运行的 Spark 作业的信息。如需详细了解 Spark 界面,请参阅官方 Spark 文档。
5. Dataproc 模板:BQ ->GCS
Dataproc 模板是一种开源工具,有助于进一步简化云端数据处理任务。这些容器可用作 Dataproc Serverless 的封装容器,并包含用于许多数据导入和导出任务的模板,包括:
BigQuerytoGCS
和GCStoBigQuery
GCStoBigTable
GCStoJDBC
和JDBCtoGCS
HivetoBigQuery
MongotoGCS
和GCStoMongo
有关完整列表,请参见README。
在本部分中,您将使用 Dataproc 模板将数据从 BigQuery 导出到 GCS。
克隆代码库
克隆代码库并切换到 python
文件夹。
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git cd dataproc-templates/python
配置环境
您现在将设置环境变量。Dataproc 模板使用环境变量 GCP_PROJECT
作为您的项目 ID,因此请将此变量设置为 GOOGLE_CLOUD_PROJECT.
export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}
您的区域应在之前的环境中设置。如果没有,请在此处进行设置。
export REGION=<region>
Dataproc 模板使用 spark-bigquery-conector 处理 BigQuery 作业,并要求将 URI 包含在环境变量 JARS
中。设置 JARS
变量。
export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"
配置模板参数
设置要使用的服务使用的暂存存储分区的名称。
export GCS_STAGING_LOCATION=gs://${BUCKET}
接下来,您将设置一些特定于作业的变量。对于输入表,您将再次引用 BigQuery NYC Citibike 数据集。
BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips
您可以选择 csv
、parquet
、avro
或 json
。对于此 Codelab,请选择 CSV - 在下一部分中,如何使用 Dataproc 模板转换文件类型。
BIGQUERY_GCS_OUTPUT_FORMAT=csv
将输出模式设置为 overwrite
。您可以选择 overwrite
、append
、ignore
或 errorifexists.
。
BIGQUERY_GCS_OUTPUT_MODE=overwrite
将 GCS 输出位置设置为存储分区中的路径。
BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS
运行模板
运行 BIGQUERYTOGCS
模板,方法是在下面指定该模板并提供您设置的输入参数。
./bin/start.sh \ -- --template=BIGQUERYTOGCS \ --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \ --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \ --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \ --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}
输出会有些嘈杂,但大约一分钟后,您会看到以下内容。
Batch [5766411d6c78444cb5e80f305308d8f8] submitted. ... Batch [5766411d6c78444cb5e80f305308d8f8] finished.
您可以通过运行以下命令来验证文件是否已生成。
gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}
默认情况下,Spark 会根据数据量写入多个文件。在这种情况下,您会看到大约 30 个生成的文件。Spark 输出文件名采用 part
格式,后跟一个五位数(表示部件号)和一个哈希字符串。对于大量数据,Spark 通常会写出多个文件。示例文件名为 part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv
。
6. Dataproc 模板:将 CSV 文件转换为 Parquet
现在,您将使用 Dataproc 模板,通过 GCSTOGCS 将 GCS 中的数据从一种文件类型转换为另一种文件类型。此模板使用 SparkSQL,并提供一个选项,以便同时提交要在转换期间处理的 SparkSQL 查询,以进行其他处理。
确认环境变量
确认在上一部分中设置了 GCP_PROJECT
、REGION
和 GCS_STAGING_BUCKET
。
echo ${GCP_PROJECT} echo ${REGION} echo ${GCS_STAGING_LOCATION}
设置模板参数
现在,您将设置 GCStoGCS
的配置参数。从输入文件的位置开始。请注意,这是一个目录而非特定文件,因为系统会处理该目录中的所有文件。将此项设为 BIGQUERY_GCS_OUTPUT_LOCATION
。
GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}
设置输入文件的格式。
GCS_TO_GCS_INPUT_FORMAT=csv
设置所需的输出格式。您可以选择 parquet、json、avro 或 csv。
GCS_TO_GCS_OUTPUT_FORMAT=parquet
将输出模式设置为 overwrite
。您可以选择 overwrite
、append
、ignore
或 errorifexists.
。
GCS_TO_GCS_OUTPUT_MODE=overwrite
设置输出位置。
GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS
运行模板
运行 GCStoGCS
模板。
./bin/start.sh \ -- --template=GCSTOGCS \ --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \ --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \ --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \ --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \ --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}
输出将非常嘈杂,但大约一分钟后,您应该会看到如下所示的成功消息。
Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted. ... Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.
您可以通过运行以下命令来验证文件是否已生成。
gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}
使用此模板,您还可以选择提供 SparkSQL 查询,方法是将 gcs.to.gcs.temp.view.name
和 gcs.to.gcs.sql.query
传递给模板,从而在将数据写入 GCS 之前对数据运行 SparkSQL 查询。
7. 清理资源
为避免在完成此 Codelab 后向您的 GCP 账号产生不必要的费用,请执行以下操作:
gsutil rm -r gs://${BUCKET}
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \ --region=${REGION}
- 删除 Dataproc 无服务器作业。转到批处理控制台,点击要删除的每个作业旁边的复选框,然后点击删除。
如果您专门为此 Codelab 创建了一个项目,也可以选择删除该项目:
- 在 GCP Console 中,转到项目页面。
- 在项目列表中,选择要删除的项目,然后点击“删除”。
- 在框中输入项目 ID,然后点击“关停”以删除项目。
8. 后续步骤
以下资源提供了利用 Serverless Spark 的其他方式:
- 了解如何使用 Cloud Composer 编排 Dataproc 无服务器工作流。
- 了解如何将 Dataproc Serverless 与 Kubeflow 流水线集成。