在 Dataproc 上使用 PySpark 预处理 BigQuery 数据

1. 概览

本 Codelab 将介绍如何在 Google Cloud Platform 上结合使用 Apache SparkDataproc 来创建数据处理流水线。数据科学和数据工程领域有一种常见的使用场景,即,从一个存储位置读取数据,对数据执行转换,然后将数据写入另一个存储位置。常见的转换包括更改数据内容、剥离不必要的信息以及更改文件类型。

在本 Codelab 中,您将了解 Apache Spark,并使用 Dataproc 搭配 PySpark(Apache Spark 的 Python API)、BigQueryGoogle Cloud Storage 和 Reddit 中的数据运行示例流水线。

2. Apache Spark 简介(可选)

根据该网站的说法,“Apache Spark 是一个用于进行大规模数据处理的统一分析引擎”。借助它,您可以在内存中并行分析和处理数据,从而实现跨多台不同机器和节点进行大规模并行计算。它最初于 2014 年发布,是传统 MapReduce 的升级版,至今仍是执行大规模计算的最热门框架之一。Apache Spark 采用 Scala 编写,因此具有 Scala、Java、Python 和 R 中的 API。它包含许多库,例如用于对数据执行 SQL 查询的 Spark SQL、用于流式传输数据的 Spark Streaming、用于机器学习的 MLlib 和用于图处理的 GraphX,所有这些库都运行在 Apache Spark 引擎上。

32add0b6a47bafbc.png

Spark 可以自行运行,也可以利用 YarnMesosKubernetes 等资源管理服务进行扩缩。在本 Codelab 中,您将使用 Dataproc,它采用 Yarn。

Spark 中的数据最初会加载到内存中,称为 RDD(弹性分布式数据集)。自那以后,Spark 的开发就包括了添加两种新的列式数据类型:具有类型的 Dataset 和无类型的 Dataframe。粗略地说,RDD 非常适合处理任何类型的数据,而数据集和数据框架则针对表格数据进行了优化。由于数据集仅适用于 Java 和 Scala API,因此我们将继续使用 PySpark Dataframe API 来完成本 Codelab。如需了解详情,请参阅 Apache Spark 文档

3. 用例

数据工程师通常需要让数据科学家能够轻松访问数据。不过,数据通常在最初是脏数据(在当前状态下难以用于分析),需要先进行清理,然后才能发挥很大作用。例如,从网络抓取的数据可能包含奇怪的编码或多余的 HTML 标记。

在本实验中,您将从 BigQuery 中加载一组以 Reddit 帖子形式存储的数据,将其加载到托管在 Dataproc 上的 Spark 集群中,提取有用信息,并将处理后的数据以压缩 CSV 文件的形式存储在 Google Cloud Storage 中。

be2a4551ece63bfc.png

贵公司首席数据科学家希望让其团队着手解决不同的自然语言处理问题。具体而言,他们有兴趣分析“r/food”子论坛中的数据。您将创建一个数据转储流水线,从 2017 年 1 月到 2019 年 8 月的回填开始。

4. 通过 BigQuery Storage API 访问 BigQuery

随着数据量的扩大,使用 tabledata.list API 方法从 BigQuery 中提取数据可能会非常耗时且效率不高。此方法会返回 JSON 对象列表,并且需要一次顺序读取一页,才能读取整个数据集。

BigQuery Storage API 使用基于 RPC 的协议,显著提升了对 BigQuery 中数据的访问速度。它支持并行数据读写,以及不同的序列化格式,例如 Apache AvroApache Arrow。概括来讲,这意味着性能会显著提升,尤其是在大型数据集上。

在本 Codelab 中,您将使用 spark-bigquery-connector 在 BigQuery 和 Spark 之间读取和写入数据。

5. 创建项目

登录 console.cloud.google.com 中的 Google Cloud Platform 控制台,然后创建一个新项目:

7e541d932b20c074.png

2deefc9295d114ea.png

a92a49afe05008a.png

接下来,您需要在 Cloud 控制台中启用结算功能,才能使用 Google Cloud 资源。

在此 Codelab 中运行仅花费几美元,但是如果您决定使用更多资源或继续让它们运行,费用可能更高。本 Codelab 的最后一部分将引导您清理项目。

Google Cloud Platform 的新用户有资格获享$300 免费试用

6. 设置您的环境

现在,您将按照以下步骤设置环境:

  • 启用 Compute Engine、Dataproc 和 BigQuery Storage API
  • 配置项目设置
  • 创建 Dataproc 集群
  • 创建 Google Cloud Storage 存储分区

启用 API 并配置环境

按 Cloud 控制台右上角的按钮,打开 Cloud Shell。

a10c47ee6ca41c54.png

Cloud Shell 加载完毕后,运行以下命令以启用 Compute Engine、Dataproc 和 BigQuery Storage API:

gcloud services enable compute.googleapis.com \
  dataproc.googleapis.com \
  bigquerystorage.googleapis.com

设置项目的项目 ID。您可以前往项目选择页面并搜索您的项目,找到该项目。此名称可能与您的项目名称不同。

e682e8227aa3c781.png

76d45fb295728542.png

运行以下命令以设置项目 ID

gcloud config set project <project_id>

此处的列表中选择一个区域,以设置项目的区域。例如 us-central1

gcloud config set dataproc/region <region>

为您的 Dataproc 集群选择一个名称,并为其创建一个环境变量。

CLUSTER_NAME=<cluster_name>

创建 Dataproc 集群

通过执行以下命令来创建 Dataproc 集群:

 gcloud beta dataproc clusters create ${CLUSTER_NAME} \
     --worker-machine-type n1-standard-8 \
     --num-workers 8 \
     --image-version 1.5-debian \
     --initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh \
     --metadata 'PIP_PACKAGES=google-cloud-storage' \
     --optional-components=ANACONDA \
     --enable-component-gateway

此命令需要几分钟才能完成。拆解该命令:

这将启动使用您之前提供的名称创建 Dataproc 集群的流程。使用 beta API 将启用 Dataproc 的 Beta 版功能,例如组件网关

gcloud beta dataproc clusters create ${CLUSTER_NAME}

这将设置要为工作器使用的机器类型

--worker-machine-type n1-standard-8

这将设置集群中的工作器数量。

--num-workers 8

这将设置 Dataproc 的映像版本

--image-version 1.5-debian

这将配置要在集群上使用的初始化操作。在这里,您要添加 pip 初始化操作。

--initialization-actions gs://dataproc-initialization-actions/python/pip-install.sh 

这是要包含在集群中的元数据。在这里,您将为 pip 初始化操作提供元数据。

--metadata 'PIP_PACKAGES=google-cloud-storage'

这将设置要安装在集群上的可选组件

--optional-components=ANACONDA

这将启用组件网关,以便您使用 Dataproc 的组件网关查看 Zeppelin、Jupyter 或 Spark 历史记录等常用界面

--enable-component-gateway

如需更深入地了解 Dataproc,请查看此 codelab

创建 Google Cloud Storage 存储分区

您需要一个 Google Cloud Storage 存储分区来存储作业输出。为存储分区确定一个唯一名称,然后运行以下命令以创建新存储分区。存储分区名称在所有用户的所有 Google Cloud 项目中都是唯一的,因此您可能需要使用不同的名称重试几次。如果您未收到 ServiceException,则表示存储分区已成功创建。

BUCKET_NAME=<bucket_name>
gsutil mb gs://${BUCKET_NAME}

7. 探索性数据分析

在执行预处理之前,您应详细了解要处理的数据的性质。为此,您将探索两种数据探索方法。首先,您将使用 BigQuery 网页界面查看一些原始数据,然后使用 PySpark 和 Dataproc 计算每个子版块的帖子数量。

使用 BigQuery 网页界面

首先,使用 BigQuery 网页界面查看数据。在 Cloud 控制台中,点击菜单图标,然后向下滚动并按“BigQuery”以打开 BigQuery 网页界面。

242a597d7045b4da.png

接下来,在 BigQuery 网页界面查询编辑器中运行以下命令。这将返回 2017 年 1 月的数据的 10 个完整行:

select * from fh-bigquery.reddit_posts.2017_01 limit 10;

b333c72d60ae6eb8.png

您可以滚动页面,查看所有可用列以及一些示例。具体而言,您会看到两个列,分别代表每篇帖子的文字内容:“title”(标题)和“selftext”(自述),后者是帖子的正文。另请注意其他列,例如“created_utc”(发布时间,以 UTC 时间表示)和“subreddit”(帖子所在的 subreddit)。

执行 PySpark 作业

在 Cloud Shell 中运行以下命令,克隆包含示例代码的代码库并进入正确的目录:

cd
git clone https://github.com/GoogleCloudPlatform/cloud-dataproc

您可以使用 PySpark 确定每个子版块包含多少帖子。您可以打开 Cloud Shell Editor 并阅读脚本 cloud-dataproc/codelabs/spark-bigquery,然后在下一步中执行该脚本:

5d965c6fb66dbd81.png

797cf71de3449bdb.png

点击 Cloud Editor 中的“打开终端”按钮,切换回 Cloud Shell,然后运行以下命令以执行您的第一个 PySpark 作业:

cd ~/cloud-dataproc/codelabs/spark-bigquery
gcloud dataproc jobs submit pyspark --cluster ${CLUSTER_NAME} \
    --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
    --driver-log-levels root=FATAL \
    counts_by_subreddit.py

通过此命令,您可以通过 Jobs API 将作业提交到 Dataproc。在这里,您将作业类型指示为 pyspark。您可以提供集群名称、可选参数 以及包含作业的文件的名称。在这里,您提供的参数 --jars 可让您在作业中添加 spark-bigquery-connector。您还可以使用 --driver-log-levels root=FATAL 设置日志输出级别,这样做会抑制除错误之外的所有日志输出。Spark 日志往往会产生大量噪声。

此过程应该需要几分钟才能运行完毕,最终输出应如下所示:

6c185228db47bb18.png

8. 探索 Dataproc 和 Spark 界面

在 Dataproc 上运行 Spark 作业时,您可以使用两个界面来检查作业 / 集群的状态。第一种是 Dataproc 界面,您可以通过点击菜单图标并向下滚动到 Dataproc 找到该界面。在这里,您可以查看当前可用内存以及待处理内存和工作器数量。

6f2987346d15c8e2.png

您还可以点击“作业”标签页查看已完成的作业。您可以点击特定作业的作业 ID,查看作业详情,例如这些作业的日志和输出。114d90129b0e4c88.png

1b2160f0f484594a.png

您还可以查看 Spark 界面。在作业页面中,点击返回箭头,然后点击“Web 界面”。您应该会在“组件网关”下看到多个选项。在设置集群时,您可以通过可选组件启用其中的许多组件。对于本实验,请点击“Spark History Server”。

5da7944327d193dc.png

6a349200289c69c1.png e63b36bdc90ff610.png

系统随即会打开以下窗口:

8f6786760f994fe8.png

所有已完成的作业都会显示在此处,您可以点击任何 application_id 来详细了解相应作业。同样,您可以点击着陆页底部的“显示未完成的申请”,查看当前正在运行的所有作业。

9. 运行回填作业

现在,您将运行一个作业,该作业会将数据加载到内存中,提取必要的信息,并将输出转储到 Google Cloud Storage 存储分区中。您将提取每个 Reddit 评论的“标题”“正文”(原始文本)和“创建时间戳”。然后,您将获取这些数据,将其转换为 CSV 文件,进行压缩,并将其加载到 URI 为 gs://${BUCKET_NAME}/reddit_posts/YYYY/MM/food.csv.gz 的存储分区中。

您可以再次使用 Cloud 编辑器,仔细阅读 cloud-dataproc/codelabs/spark-bigquery/backfill.sh代码cloud-dataproc/codelabs/spark-bigquery/backfill.sh 是一个封装容器脚本,用于执行 cloud-dataproc/codelabs/spark-bigquery/backfill.py 中的代码

cd ~/cloud-dataproc/codelabs/spark-bigquery
bash backfill.sh ${CLUSTER_NAME} ${BUCKET_NAME}

您很快就会看到一系列作业完成消息。该作业最多可能需要 15 分钟才能完成。您还可以使用 gsutil 仔细检查存储分区,以验证数据是否成功输出。所有作业完成后,运行以下命令:

gsutil ls gs://${BUCKET_NAME}/reddit_posts/*/*/food.csv.gz

您应该会看到以下输出内容:

a7c3c7b2e82f9fca.png

恭喜,您已成功完成 Reddit 评论数据回填!如果您有兴趣了解如何基于这些数据构建模型,请继续学习 Spark-NLP Codelab。

10. 清理

为避免在完成本快速入门后向您的 GCP 账号收取不必要的费用,请执行以下操作:

  1. 删除您创建的环境的 Cloud Storage 存储分区
  2. 删除 Dataproc 环境

如果您仅为此 Codelab 创建了项目,则可以选择删除该项目:

  1. 在 GCP 控制台中,前往项目页面。
  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在框中输入项目 ID,然后点击关停以删除项目。

许可

此作品已获得 Creative Commons Attribution 3.0 通用许可和 Apache 2.0 许可。