Google Compute Engine 上的 Dataproc

1. 简介 - Google Dataproc

Dataproc 是一项具有高度可扩容性的全托管式服务,用于运行 Apache Spark、Apache Flink、Presto 和众多其他开源工具和框架。使用 Dataproc 可以大规模实现数据湖现代化改造、ETL / ELT 和安全数据科学。Dataproc 还可与多种 Google Cloud 服务完全集成,包括 BigQueryCloud StorageVertex AIDataplex

Dataproc 有三种版本:

  • 借助 Dataproc Serverless,您无需配置基础架构和自动扩缩功能即可运行 PySpark 作业。Dataproc Serverless 支持 PySpark 批量工作负载和会话 / 笔记本。
  • Dataproc on Google Compute Engine 可让您为基于 YARN 的 Spark 工作负载以及 Flink 和 Presto 等开源工具管理 Hadoop YARN 集群。您可以根据需要纵向或横向扩缩云端集群,包括自动扩缩。
  • Dataproc on Google Kubernetes Engine 可让您在 GKE 基础架构中配置 Dataproc 虚拟集群,以方便提交 Spark、PySpark、SparkR 或 Spark SQL 作业。

2. 在 Google Cloud VPC 中创建 Dataproc 集群

在此步骤中,您将使用 Google Cloud 控制台在 Google Cloud 上创建 Dataproc 集群。

第一步是在控制台上启用 Dataproc 服务 API。启用后,在搜索栏中搜索“Dataproc”,然后点击创建集群

选择 Compute Engine 上的集群,以使用 Google Compute Engine(GCE) 虚拟机作为运行 Dataproc 集群的基础架构。

a961b2e8895e88da.jpeg

您现在位于“集群创建”页面。

9583c91204a09c12.jpeg

本页内容:

  • 为集群提供一个唯一的名称。
  • 选择特定区域。您也可以选择可用区,不过 Dataproc 能够自动为您选择可用区。在此 Codelab 中,请选择“us-central1”和“us-central1-c”。
  • 选择“标准”集群类型。这可确保只有一个主节点。
  • 配置节点标签页中,确认创建的工作器数量为 2。
  • 自定义集群部分中,选中启用组件网关旁边的复选框。这样,您就可以访问集群上的网页界面,包括 Spark 界面、Yarn 节点管理器和 Jupyter 笔记本。
  • 可选组件中,选择 Jupyter 笔记本。此配置用于配置具有 Jupyter 笔记本服务器的集群。
  • 将所有其他设置保留原样,然后点击创建集群

这会启动一个 Dataproc 集群。

3. 启动集群并通过 SSH 连接到集群

当集群状态变为正在运行后,在 Dataproc 控制台中点击集群名称。

7332f1c2cb25807d.jpeg

点击虚拟机实例标签页,查看集群的主节点和两个工作器节点。

25be1578e00f669f.jpeg

点击主节点旁边的 SSH 以登录主节点。

2810ffd97f315bdb.jpeg

运行 hdfs 命令以查看目录结构。

hadoop_commands_example

sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51 
sudo hadoop fs -ls /

4. 网页界面和组件网关

Dataproc 集群控制台中,点击集群的名称,然后点击 WEB 界面标签页。

6398f71d6293d6ff.jpeg

这会显示可用的 Web 界面,包括 Jupyter。点击 Jupyter 以打开 Jupyter 笔记本。您可以使用此功能在 GCS 中创建以 PySpark 编写的笔记本。将笔记本存储在 Google Cloud Storage 中,然后打开一个 PySpark 笔记本以在此 Codelab 中使用。

5. 监控和观察 Spark 作业

在 Dataproc 集群正常运行后,创建一个 PySpark 批量作业,并将该作业提交到 Dataproc 集群。

创建一个 Google Cloud Storage (GCS) 存储分区,用于存储 PySpark 脚本。请务必在 Dataproc 集群所在的区域中创建存储分区。

679fd2f76806f4e2.jpeg

现在,GCS 存储分区已创建完毕,请将以下文件复制到此存储分区中。

https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py

此脚本会创建一个示例 Spark DataFrame,并将其写入为 Hive 表。

hive_job.py

from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row

spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()

df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
        (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
    ], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")

将此脚本作为 Spark 批量作业在 Dataproc 中提交。点击左侧导航菜单中的作业,然后点击提交作业

5767fc7c50b706d3.jpeg

提供任务 ID区域。选择您的集群,并提供您复制的 Spark 脚本的 GCS 位置。此作业将作为 Spark 批量作业在 Dataproc 上运行。

属性下,添加键 spark.submit.deployMode 和值 client,以确保驱动程序在 Dataproc 主节点中运行,而不是在工作器节点中运行。点击提交,将批量作业提交到 Dataproc。

a7ca90f5132faa31.jpeg

Spark 脚本将创建一个 DataFrame 并写入 Hive 表 test_table_1

作业成功运行后,您可以在监控标签页下看到控制台打印语句。

bdec2f3ae1055f9.jpeg

现在,Hive 表已创建完毕,请提交另一个 Hive 查询作业,以选择该表的内容并在控制台上显示。

创建另一个具有以下属性的作业:

c16f02d1b3afaa27.jpeg

请注意,作业类型已设置为 Hive,查询源类型为查询文本,这意味着我们将在查询文本文本框中编写整个 HiveQL 语句。

提交作业,其余参数保留为默认值。

e242e50bc2519bf4.jpeg

请注意,HiveQL 如何选择所有记录并显示在控制台上。

6. 自动扩缩

自动扩缩是指估算工作负载的“适当”集群工作器节点数量的任务。

Dataproc AutoscalingPolicies API 提供自动管理集群资源的机制,还启用了集群工作器虚拟机的自动扩缩功能。自动扩缩政策是一项可重复使用的配置,用于描述使用该自动扩缩政策的集群工作器的扩缩方式。该政策定义了扩缩边界、频率和积极性,以在整个集群生命周期内提供对集群资源的精细控制。

Dataproc 自动扩缩政策使用 YAML 文件编写,这些 YAML 文件要么在用于创建集群的 CLI 命令中传递,要么在通过 Cloud 控制台创建集群时从 GCS 存储分区中选择。

以下是一个 Dataproc 自动扩缩政策示例:

policy.yaml

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

7. 配置 Dataproc 可选组件

这会启动一个 Dataproc 集群。

创建 Dataproc 集群时,标准 Apache Hadoop 生态系统组件会自动安装在集群中(请参阅 Dataproc 版本列表)。您可以在创建集群时在集群上安装称为可选组件的其他组件。

e39cc34245af3f01.jpeg

在控制台中创建 Dataproc 集群时,我们已启用可选组件,并选择 Jupyter 笔记本作为可选组件。

8. 清理资源

如需清理集群,请在 Dataproc 控制台中选择集群后,点击停止。集群停止后,点击删除以删除集群。

删除 Dataproc 集群后,删除复制了代码的 GCS 存储分区。

为了清理资源并避免产生任何不必要的结算费用,您需要先停止 Dataproc 集群,然后再将其删除。

在停止并删除集群之前,请确保已将写入 HDFS 存储空间的所有数据复制到 GCS 以进行持久存储。

如需停止集群,请点击停止

52065de928ab52e7.jpeg

集群停止后,点击删除以删除集群。

在确认对话框中,点击删除以删除集群。

52065de928ab52e7.jpeg