1. 概览
此 Codelab 将介绍如何在 Google Cloud Platform 上搭配使用 Apache Spark 和 Dataproc 来创建数据处理流水线。数据科学和数据工程中的一种常见使用场景是,从一个存储位置读取数据,对其执行转换并将其写入另一个存储位置。常见的转换包括更改数据内容、去除不必要的信息和更改文件类型。
在此 Codelab 中,您将了解 Apache Spark,并将 Dataproc 与 PySpark(Apache Spark 的 Python API)、BigQuery、Google Cloud Storage 以及来自 Reddit 的数据搭配使用,以运行示例流水线。
2. Apache Spark 简介(可选)
根据该网站的信息,“Apache Spark 是一个用于大规模数据处理的统一分析引擎。”它允许您并行分析和处理内存中的数据,从而实现跨多个不同机器和节点的大规模并行计算。它最初在 2014 年发布,作为对传统 MapReduce 的升级,现在仍然是最流行的用于执行大规模计算的框架之一。Apache Spark 是用 Scala 编写的,随后又有 Scala、Java、Python 和 R 版的 API。它包含大量库,例如用于对数据执行 SQL 查询的 Spark SQL、用于流式数据的 Spark Streaming、用于机器学习的 MLlib 和用于图形处理的 GraphX,所有这些库都在 Apache Spark 引擎上运行。
Spark 可以单独运行,也可以利用 Yarn、Mesos 或 Kubernetes 等资源管理服务进行扩缩。您将在此 Codelab 中使用 Dataproc,它采用 Yarn。
Spark 中的数据最初被加载到内存中,即 RDD(即弹性分布式数据集)。此后,Spark 上的开发工作添加了两种新的列式数据类型:已指定数据类型的 Dataset 和未输入数据类型的 Dataframe。简单来说,RDD 适合任何类型的数据,而数据集和 Dataframe 则针对表格数据进行了优化。由于数据集仅适用于 Java 和 Scala API,因此我们将在本 Codelab 中继续使用 PySpark Dataframe API。如需了解详情,请参阅 Apache Spark 文档。
3. 应用场景
数据工程师通常需要数据科学家能够轻松访问数据。不过,数据最初往往是脏的(在当前状态下很难用于分析),并且需要先清理,然后才能得到大量使用。例如,从网络抄袭的数据可能包含奇怪的编码或无关的 HTML 标记。
在本实验中,您将以 Reddit 帖子的形式从 BigQuery 将一组数据加载到 Dataproc 上托管的 Spark 集群中,提取有用信息,并将处理后的数据以压缩的 CSV 文件格式存储在 Google Cloud Storage 中。
贵公司的首席数据科学家希望让自己的团队解决不同的自然语言处理问题。具体来说,他们对分析 subreddit“r/food”中的数据很感兴趣。您将从 2017 年 1 月至 2019 年 8 月的回填开始,为数据转储创建流水线。
4. 通过 BigQuery Storage API 访问 BigQuery
事实证明,使用 tabledata.list API 方法从 BigQuery 中提取数据可能非常耗时,而且随着数据量的增加,效率低下。此方法会返回 JSON 对象列表,并且需要一次按顺序读取一页内容,才能读取整个数据集。
通过使用基于 RPC 的协议,BigQuery Storage API 显著改进了在 BigQuery 中访问数据的方式。它支持并行数据读写以及不同的序列化格式,例如 Apache Avro 和 Apache Arrow。概括来讲,这可以显著提升性能,尤其是在处理较大的数据集时。
在此 Codelab 中,您将使用 spark-bigquery-connector 在 BigQuery 和 Spark 之间读取和写入数据。
5. 创建项目
前往 console.cloud.google.com 登录 Google Cloud Platform 控制台并创建一个新项目:
接下来,您需要在 Cloud 控制台中启用结算功能,才能使用 Google Cloud 资源。
在此 Codelab 中运行仅花费几美元,但是如果您决定使用更多资源或继续让它们运行,费用可能更高。此 Codelab 的最后一部分将引导您清理项目。
Google Cloud Platform 的新用户有资格获享$300 免费试用。
6. 设置您的环境
现在,您需要按照以下步骤设置环境:
- 启用 Compute Engine、Dataproc 和 BigQuery Storage API
- 配置项目设置
- 创建 Dataproc 集群
- 创建 Google Cloud Storage 存储分区
启用 API 并配置您的环境
按 Cloud 控制台右上角的按钮,打开 Cloud Shell。
Cloud Shell 加载后,运行以下命令以启用 Compute Engine、Dataproc 和 BigQuery Storage API:
gcloud services enable compute.googleapis.com \
dataproc.googleapis.com \
bigquerystorage.googleapis.com
设置项目的项目 ID。您可以前往项目选择页面并搜索您的项目,以找到该项目。该名称可能与项目名称不同。
运行以下命令以设置项目 ID:
gcloud config set project <project_id>
从此处的列表中选择一个区域,为您的项目设置区域。例如 us-central1
。
gcloud config set dataproc/region <region>
为您的 Dataproc 集群选择一个名称并为其创建环境变量。
CLUSTER_NAME=<cluster_name>
创建 Dataproc 集群
通过执行以下命令创建 Dataproc 集群:
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--worker-machine-type n1-standard-8 \
--num-workers 8 \
--image-version 1.5-debian \
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
--metadata 'PIP_PACKAGES=google-cloud-storage' \
--optional-components=ANACONDA \
--enable-component-gateway
此命令需要几分钟才能完成。要对此命令进行细分,请执行以下操作:
这将使用您之前提供的名称开始创建 Dataproc 集群。使用 beta
API 将启用 Dataproc 的 Beta 版功能,例如组件网关。
gcloud beta dataproc clusters create ${CLUSTER_NAME}
--worker-machine-type n1-standard-8
这将设置集群将拥有的工作器数量。
--num-workers 8
这将设置 Dataproc 的映像版本。
--image-version 1.5-debian
这将配置要在集群上使用的初始化操作。在这里,您将添加 pip 初始化操作。
--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh
这是要包含在集群上的元数据。在这里,您将提供 pip
初始化操作的元数据。
--metadata 'PIP_PACKAGES=google-cloud-storage'
这将设置要在集群上安装的可选组件。
--optional-components=ANACONDA
这将启用组件网关,让您可以使用 Dataproc 的组件网关查看常见界面,例如 Zeppelin、Jupyter 或 Spark 历史记录
--enable-component-gateway
如需深入了解 Dataproc,请查看此 Codelab。
创建 Google Cloud Storage 存储分区
您需要一个 Google Cloud Storage 存储分区来存储作业输出。确定存储分区的唯一名称,并运行以下命令创建新的存储分区。所有用户的存储分区名称在所有 Google Cloud 项目中都是唯一的,因此您可能需要尝试使用其他名称多次尝试此操作。如果您未收到 ServiceException
,则表示存储分区已成功创建。
BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}
7. 探索性数据分析
在执行预处理之前,您应该详细了解所处理数据的性质。为此,您需要探索两种数据探索方法。首先,您将使用 BigQuery 网页界面查看一些原始数据,然后使用 PySpark 和 Dataproc 计算每个 subreddit 的帖子数。
使用 BigQuery 网页界面
首先使用 BigQuery 网页界面查看您的数据。通过 Cloud 控制台的菜单图标,向下滚动,然后按“BigQuery”以打开 BigQuery 网页界面。
接下来,在 BigQuery 网页界面查询编辑器中运行以下命令。这将返回 2017 年 1 月的 10 个完整的数据行:
select * from fh-bigquery.reddit_posts.2017_01 limit 10;
您可以滚动页面,查看所有可用的列以及一些示例。具体而言,您会看到代表每篇博文的文字内容的两列:“标题”和“selftext”,后者是帖子的正文。另请注意其他列,例如“created_utc”。这是博文的 utc 时间,并且“subreddit”也就是帖子所在的 subreddit
执行 PySpark 作业
在 Cloud Shell 中运行以下命令,以克隆包含示例代码的代码库,并通过 cd 命令进入正确的目录:
cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc
您可以使用 PySpark 确定每个 subreddit 对应的帖子数。您可以打开 Cloud Editor 并读取脚本 cloud-dataproc/codelabs/spark-bigquery
,然后在下一步中执行该脚本:
点击“打开终端”按钮切换回 Cloud Shell 并运行以下命令以执行您的第一个 PySpark 作业:
cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
--driver-log-levels root=FATAL \
counts_by_subreddit.py
此命令允许您通过 Jobs API 将作业提交到 Dataproc。在这里,您将作业类型指示为 pyspark
。您可以提供集群名称、可选参数 和包含该作业的文件的名称。在这里,您将提供参数 --jars
,它允许您在作业中添加 spark-bigquery-connector
。您还可以使用 --driver-log-levels root=FATAL
设置日志输出级别,这会抑制除错误之外的所有日志输出。Spark 日志往往非常嘈杂。
运行这需要几分钟的时间,最终输出应如下所示:
8. 探索 Dataproc 和 Spark 界面
在 Dataproc 上运行 Spark 作业时,您可以使用两个界面来检查作业 / 集群的状态。第一个是 Dataproc 界面,您可以点击菜单图标并向下滚动到 Dataproc 来找到该界面。在这里,您可以查看当前可用内存、待处理内存和工作器数量。
您还可以点击“作业”标签页查看已完成的作业。您可以通过点击特定作业的作业 ID 来查看作业详情,例如日志和这些作业的输出。
您还可以查看 Spark 界面。在作业页面中,点击返回箭头,然后点击“网页界面”。您应该会在组件网关下看到几个选项。在设置集群时,其中许多功能都可以通过可选组件启用。在本实验中,请点击“Spark History Server”。
系统应打开以下窗口:
所有已完成的作业都将显示在此处,您可以点击任意 application_id 以详细了解相应作业。同样,您可以点击“显示未完成的应用”查看当前正在运行的所有作业。
9. 运行回填作业
您现在将运行一个作业,该作业将数据加载到内存中、提取必要信息并将输出转储到 Google Cloud Storage 存储分区中。您需要将“title”“body”(原始文本)和“timestamp created”(已创建时间戳)每条 Reddit 评论的链接。然后,您需要获取这些数据,将其转换为 CSV 文件,然后将其压缩,然后将它加载到 URI 为 gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz 的存储分区中。
您可以再次参考 Cloud Editor 以通读代码 cloud-dataproc/codelabs/spark-bigquery/backfill.sh
,该代码是用于在 cloud-dataproc/codelabs/spark-bigquery/backfill.py
中执行代码 的封装容器脚本。
cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}
您应该很快就会看到大量作业完成消息。该作业最多可能需要 15 分钟才能完成。您还可以使用 gsutil 仔细检查存储分区,以验证数据输出是否成功。完成所有作业后,请运行以下命令:
gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz
您应该会看到以下输出内容:
恭喜,您已成功完成 Reddit 评论数据的回填!如果您有兴趣了解如何基于这些数据构建模型,请继续学习 Spark-NLP Codelab。
10. 清理
为避免在完成本快速入门后向您的 GCP 账号产生不必要的费用,请执行以下操作:
- 为环境删除 Cloud Storage 存储分区以及您创建的存储分区
- 删除 Dataproc 环境。
如果您专门为此 Codelab 创建了一个项目,也可以选择删除该项目:
- 在 GCP Console 中,转到项目页面。
- 在项目列表中,选择要删除的项目,然后点击删除。
- 在框中输入项目 ID,然后点击关停以删除项目。
许可
此作品已获得知识共享署名 3.0 通用许可和 Apache 2.0 许可。