1. 简介 - Google Dataproc
Dataproc 是一项扩缩能力极强的全代管式服务,可用于运行 Apache Spark、Apache Flink、Presto 以及许多其他开源工具和框架。使用 Dataproc 在全球范围内实现数据湖现代化改造、ETL / ELT 以及确保数据科学安全。Dataproc 还与多项 Google Cloud 服务全面集成,包括 BigQuery、Cloud Storage、Vertex AI 和 Dataplex。
Dataproc 提供三种版本:
- 借助 Dataproc Serverless,您无需配置基础架构和自动扩缩即可运行 PySpark 作业。Dataproc Serverless 支持 PySpark 批处理工作负载和会话 / 笔记本。
- 借助 Google Compute Engine 上的 Dataproc,除了 Flink 和 Presto 等开源工具之外,您还可以为基于 YARN 的 Spark 工作负载管理 Hadoop YARN 集群。您可以定制自己的云端集群,根据需要进行任意数量的纵向或横向扩缩(包括自动扩缩)。
- Google Kubernetes Engine 上的 Dataproc 允许您在 GKE 基础架构中配置 Dataproc 虚拟集群,以提交 Spark、PySpark、SparkR 或 Spark SQL 作业。
2. 在 Google Cloud VPC 上创建 Dataproc 集群
在此步骤中,您将使用 Google Cloud 控制台在 Google Cloud 上创建 Dataproc 集群。
首先,在控制台上启用 Dataproc 服务 API。启用后,搜索“Dataproc”,然后点击创建集群。
选择 Compute Engine 上的集群,以使用 Google Compute Engine(GCE) 虚拟机作为运行 Dataproc 集群的底层基础架构。
您现在位于“集群创建”页面。
本页内容:
- 为集群提供唯一的名称。
- 选择特定区域。您也可以选择可用区,不过,Dataproc 能够自动为您选择一个可用区。对于此 Codelab,请选择“us-central1”“us-central1-c”...
- 选择“标准”集群类型。这样可确保只有一个主节点。
- 在配置节点标签页中,确认创建的工作器数量为两个。
- 在自定义集群部分,选中启用组件网关旁边的复选框。这允许访问集群上的网页界面,包括 Spark 界面、Yarn 节点管理器和 Jupyter 笔记本。
- 在可选组件中,选择 Jupyter 笔记本。这将为集群配置一个 Jupyter 笔记本服务器。
- 保持其他设置不变,然后点击创建集群。
这将启动一个 Dataproc 集群。
3. 启动集群并通过 SSH 连接到该集群
集群状态变为正在运行后,请从 Dataproc 控制台点击集群名称。
点击虚拟机实例标签页,查看集群的主节点和两个工作器节点。
点击主节点旁边的 SSH 以登录主节点。
运行 hdfs 命令可查看目录结构。
hadoop_commands_example
sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51
sudo hadoop fs -ls /
4. 网页界面和组件网关
在 Dataproc 集群控制台中,点击集群名称,然后点击网页界面标签页。
这会显示可用的网页界面,包括 Jupyter。点击 Jupyter 以打开 Jupyter 笔记本。您可以使用此文件在存储在 GCS 上的 PySpark 中创建笔记本。在 Google Cloud Storage 上存储笔记本,并打开要在此 Codelab 中使用的 PySpark 笔记本。
5. 监控和观察 Spark 作业
在 Dataproc 集群启动并运行后,创建一个 PySpark 批处理作业并将作业提交到 Dataproc 集群。
创建 Google Cloud Storage (GCS) 存储分区,以存储 PySpark 脚本。确保在 Dataproc 集群所在的区域中创建存储分区。
现在,GCS 存储分区已创建,请将以下文件复制到此存储分区中。
https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py
此脚本会创建一个示例 Spark DataFrame 并将其编写为 Hive 表。
hive_job.py
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()
df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")
在 Dataproc 中将此脚本作为 Spark 批处理作业提交。点击左侧导航菜单中的作业,然后点击提交作业
提供作业 ID 和区域。选择您的集群,并提供您复制的 Spark 脚本的 GCS 位置。此作业将在 Dataproc 上作为 Spark 批处理作业运行。
在属性下添加键 spark.submit.deployMode
和值 client
,以确保驱动程序在 Dataproc 主节点而不是在工作器节点中运行。点击提交,将批量作业提交到 Dataproc。
Spark 脚本将创建一个 Dataframe 并将 test_table_1
写入 Hive 表。
作业成功运行后,您可以在监控标签页下看到控制台输出语句。
Hive 表已创建完毕,请提交另一个 Hive 查询作业,以选择表的内容并显示在控制台上。
创建另一个具有以下属性的作业:
请注意,Job Type 设置为 Hive,且查询来源类型为 Query Text,这意味着我们将在 Query Text 文本框中编写整个 HiveQL 语句。
提交该作业,同时保留其余参数的默认值。
请注意 HiveQL 如何选择所有记录并在控制台上显示。
6. 自动扩缩
自动扩缩是估算“右侧”的集群工作器节点的数量。
Dataproc autoscalingPolicies API 提供了一种自动管理集群资源的机制,并启用集群工作器虚拟机自动扩缩。自动扩缩政策是一项可重复使用的配置,用于描述使用自动扩缩政策的集群工作器应如何扩缩。它定义了扩缩边界、频率和积极性,以便在集群整个生命周期内提供对集群资源的精细控制。
Dataproc 自动扩缩政策是使用 YAML 文件编写的,这些 YAML 文件要么在用于创建集群的 CLI 命令中传递,要么在通过 Cloud 控制台创建集群时从 GCS 存储分区中选择。
以下是 Dataproc 自动扩缩政策的示例:
policy.yaml
workerConfig:
minInstances: 10
maxInstances: 10
secondaryWorkerConfig:
maxInstances: 50
basicAlgorithm:
cooldownPeriod: 4m
yarnConfig:
scaleUpFactor: 0.05
scaleDownFactor: 1.0
gracefulDecommissionTimeout: 1h
7. 配置 Dataproc 可选组件
这将启动一个 Dataproc 集群。
创建 Dataproc 集群时,标准 Apache Hadoop 生态系统组件会自动安装在集群中(请参阅 Dataproc 版本列表)。您可以在创建集群时在集群上安装其他组件(称为可选组件)。
通过控制台创建 Dataproc 集群时,我们启用了可选组件并选择 Jupyter 笔记本作为可选组件。
8. 清理资源
要清理集群,请从 Dataproc 控制台中选择集群后,点击停止。集群停止后,点击删除以删除集群。
删除 Dataproc 集群后,删除复制了代码的 GCS 存储分区。
如需清理资源并停止任何不需要的结算,则需要先停止 Dataproc 集群,然后再将其删除。
在停止并删除集群之前,请确保写入 HDFS 存储空间的所有数据都复制到 GCS 以实现持久性存储。
如需停止集群,请点击停止。
集群停止后,点击删除以删除集群。
在确认对话框中,点击删除以删除集群。