Cloud Dataproc 上的 Apache Spark 和 Jupyter 笔记本

1. 概览

本实验将介绍如何在 Cloud Dataproc 上设置和使用 Apache SparkJupyter 笔记本

Jupyter 笔记本广泛用于探索性数据分析和构建机器学习模型,因为您可以借助 Jupyter 笔记本以交互方式运行代码,并立即查看结果。

不过,设置和使用 Apache Spark 和 Jupyter 笔记本可能很复杂。

b9ed855863c57d6.png

借助 Cloud Dataproc,您可以在大约 90 秒内创建包含 Apache Spark、Jupyter 组件组件网关的 Dataproc 集群,从而快速而轻松地实现这一目的。

学习内容

在此 Codelab 中,您将学习如何:

  • 为您的集群创建 Google Cloud Storage 存储分区
  • 使用 Jupyter 和组件网关创建 Dataproc 集群。
  • 访问 Dataproc 上的 JupyterLab 网页界面
  • 创建使用 Spark BigQuery Storage 连接器的笔记本
  • 运行 Spark 作业并绘制结果。

在 Google Cloud 上运行此实验的总费用约为 $1。如需详细了解 Cloud Dataproc 的价格,请点击此处

2. 创建项目

访问 console.cloud.google.com 并登录 Google Cloud Platform 控制台并创建一个新项目:

7e541d932b20c074

2deefc9295d114ea

a92a49afe05008a.png

接下来,您需要在 Cloud 控制台中启用结算功能,才能使用 Google Cloud 资源。

在此 Codelab 中运行仅花费几美元,但是如果您决定使用更多资源或继续让它们运行,费用可能更高。此 Codelab 的最后一部分将引导您清理项目。

Google Cloud Platform 的新用户有资格获享$300 免费试用

3. 设置您的环境

首先,点击 Cloud 控制台右上角的按钮,打开 Cloud Shell:

a10c47ee6ca41c54.png

Cloud Shell 加载后,运行以下命令以设置上一步中的项目 ID**:**

gcloud config set project <project_id>

您还可以点击 Cloud 控制台左上角的项目,找到项目 ID:

b4b233632ce0c3c4.png

c7e39ffc6dec3765.png

接下来,启用 Dataproc、Compute Engine 和 BigQuery Storage API。

gcloud services enable dataproc.googleapis.com \
  compute.googleapis.com \
  storage-component.googleapis.com \
  bigquery.googleapis.com \
  bigquerystorage.googleapis.com

您也可以在 Cloud 控制台中完成此操作。点击屏幕左上角的菜单图标。

2bfc27ef9ba2ec7d

从下拉菜单中选择“API 管理器”。

408af5f32c4b7c25

点击启用 API 和服务

a9c0e84296a7ba5b.png

搜索并启用以下 API:

  • Compute Engine API
  • Dataproc API
  • BigQuery API
  • BigQuery Storage API

4. 创建 GCS 存储分区

在距离数据最近的区域创建一个 Google Cloud Storage 存储分区,并为其指定一个唯一的名称。

此名称将用于 Dataproc 集群。

REGION=us-central1
BUCKET_NAME=<your-bucket-name>

gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}

您应该会看到以下输出内容

Creating gs://<your-bucket-name>/...

5. 使用 Jupyter 和组件网关

正在创建集群

为集群设置环境变量

REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>

然后,运行此 gcloud 命令以创建集群,其中包含在集群上使用 Jupyter 所需的所有组件。

gcloud beta dataproc clusters create ${CLUSTER_NAME} \
 --region=${REGION} \
 --image-version=1.4 \
 --master-machine-type=n1-standard-4 \
 --worker-machine-type=n1-standard-4 \
 --bucket=${BUCKET_NAME} \
 --optional-components=ANACONDA,JUPYTER \
 --enable-component-gateway 

创建集群时,您应该会看到以下输出内容

Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...

创建集群应该需要大约 90 秒的时间。当集群准备就绪后,您就可以通过 Dataproc Cloud 控制台界面访问集群了。

在等待期间,您可以继续继续阅读以下内容,详细了解 gcloud 命令中使用的标志。

创建集群后,您应该会看到以下输出:

Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].

gcloud dataproc create 命令中使用的标志

以下是 gcloud dataproc create 命令中使用的标志的明细

--region=${REGION}

指定要创建集群的区域和可用区。您可以点击此处查看适用区域列表。

--image-version=1.4

要在集群中使用的映像版本。您可以在此处查看可用版本的列表。

--bucket=${BUCKET_NAME}

指定您之前创建的用于集群的 Google Cloud Storage 存储分区。如果您不提供 GCS 存储分区,系统会为您创建该存储分区。

即使您删除了集群,您的笔记本也会保存到这里,因为 GCS 存储分区不会被删除。

--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4

要用于您的 Dataproc 集群的机器类型。您可以在此处查看可用机器类型列表。

如果您未设置标志 -num-workers,则系统会默认创建 1 个主节点和 2 个工作器节点

--optional-components=ANACONDA,JUPYTER

可选组件设置这些值可在集群上安装 Jupyter 和 Anaconda 的所有必要库(Jupyter 笔记本需要这些库)。

--enable-component-gateway

启用组件网关后,系统会通过 Apache Knox 和反向代理创建 App Engine 链接。借助该链接,用户可以轻松、安全且经过身份验证地访问 Jupyter 和 JupyterLab 网页界面,这意味着您无需再创建 SSH 隧道。

它还将创建指向集群上其他工具的链接,包括 Yarn Resource Manager 和 Spark History Server,它们对于查看作业的性能和集群使用模式非常有用。

6. 创建 Apache Spark 笔记本

访问 JupyterLab 网页界面

集群准备就绪后,您可以前往 Dataproc 集群 - Cloud 控制台,点击您创建的集群并进入“网页界面”标签页,从而找到指向 JupyterLab 网页界面的组件网关链接。

afc40202d555de47.png

您会注意到,您可以访问 Jupyter(经典笔记本界面)或 JupyterLab(称为 Project Jupyter 的下一代界面)。

JupyterLab 提供了许多出色的全新界面功能,因此,如果您刚开始使用笔记本或想要寻求最新的改进,建议您使用 JupyterLab,因为根据官方文档,JupyterLab 最终将取代传统的 Jupyter 界面。

使用 Python 3 内核创建笔记本

a463623f2ebf0518.png

在启动器标签页中,点击 Python 3 笔记本图标以创建具有 Python 3 内核(而非 PySpark 内核)的笔记本,这让您可以在笔记本中配置 SparkSession,并添加使用 BigQuery Storage API 所需的 spark-bigquery-connector

重命名笔记本

196a3276ed07e1f3

右键点击左侧边栏中或顶部导航栏中的笔记本名称,然后将笔记本重命名为“BigQuery Storage &“Spark DataFrames.ipynb”

在笔记本中运行 Spark 代码

fbac38062e5bb9cf.png

在此笔记本中,您将使用 spark-bigquery-connector,它使用 BigQuery Storage API 在 BigQuery 和 Spark 之间读取和写入数据。

通过使用基于 RPC 的协议,BigQuery Storage API 显著改进了在 BigQuery 中访问数据的方式。它支持并行数据读写以及不同的序列化格式,例如 Apache AvroApache Arrow。概括来讲,这可以显著提升性能,尤其是在处理较大的数据集时。

在第一个单元中,检查集群的 Scala 版本,以便您可以添加正确版本的 spark-bigquery-connector jar。

输入内容 [1]

!scala -version

输出 [1]f580e442576b8b1f.png创建一个 Spark 会话并包含 spark-bigquery-connector 软件包。

如果您的 Scala 版本为 2.11,请使用以下软件包。

com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta

如果您的 Scala 版本为 2.12,请使用以下软件包。

com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta

输入内容 [2]

from pyspark.sql import SparkSession
spark = SparkSession.builder \
 .appName('BigQuery Storage & Spark DataFrames') \
 .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
 .getOrCreate()

启用 repl.eagerEval

这将在每个步骤中输出 DataFrame 的结果,而无需显示 df.show(),并且还改进了输出的格式。

输入内容 [3]

spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

将 BigQuery 表读入 Spark DataFrame

通过读取公共 BigQuery 数据集中的数据来创建 Spark DataFrame。这使用 spark-bigquery-connector 和 BigQuery Storage API 将数据加载到 Spark 集群中。

创建 Spark DataFrame 并从用于获取维基百科网页浏览数据的 BigQuery 公共数据集加载数据。您会发现,您没有对数据运行查询,这是因为在使用 spark-bigquery-connector 将数据加载到 Spark 中进行处理时。当此代码运行时,它实际上不会加载表,因为这是 Spark 中的一项延迟评估,并将在下一步中执行。

输入内容 [4]

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
  .load()

df_wiki_pageviews.printSchema()

输出 [4]

c107a33f6fc30ca.png

选择所需的列,并使用 where()filter() 的别名)应用过滤条件。

运行此代码时,它会触发 Spark 操作,此时系统会从 BigQuery Storage 中读取数据。

输入内容 [5]

df_wiki_en = df_wiki_pageviews \
  .select("datehour", "wiki", "views") \
  .where("views > 1000 AND wiki in ('en', 'en.m')") \

df_wiki_en

输出 [5]

ad363cbe510d625a.png

按标题分组并按网页浏览量排序,即可查看热门网页

输入内容 [6]

import pyspark.sql.functions as F

df_datehour_totals = df_wiki_en \
  .groupBy("datehour") \
  .agg(F.sum('views').alias('total_views'))

df_datehour_totals.orderBy('total_views', ascending=False)

输出 [6]f718abd05afc0f4.png

7. 在笔记本中使用 Python 绘图库

您可以使用 Python 中提供的各种绘制库来绘制 Spark 作业的输出。

将 Spark DataFrame 转换为 Pandas DataFrame

将 Spark DataFrame 转换为 Pandas DataFrame,并将 datehour 设置为索引。如果您想直接在 Python 中处理数据并使用许多可用的 Python 绘图库来绘制数据,这将非常有用。

输入内容 [7]

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()

pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()

输出 [7]

3df2aaa2351f028d

绘制 Pandas Dataframe

导入在笔记本中显示图表所需的 matplotlib 库

输入内容 [8]

import matplotlib.pyplot as plt

使用 Pandas 绘图函数根据 Pandas DataFrame 创建折线图。

输入内容 [9]

pandas_datehour_totals.plot(kind='line',figsize=(12,6));

输出 [9]bade7042c3033594.png

检查笔记本是否保存在 GCS 中

现在,您应该已经在 Dataproc 集群上启动并运行了您的第一个 Jupyter 笔记本。为您的笔记本命名,它会自动保存到创建集群时使用的 GCS 存储分区。

您可以在 Cloud Shell 中使用此 gsutil 命令进行检查

BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter

您应该会看到以下输出内容

gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb

8. 优化提示 - 在内存中缓存数据

在某些情况下,您可能希望将数据保存在内存中,而不是每次都从 BigQuery Storage 中读取数据。

此作业将从 BigQuery 读取数据,并将过滤条件推送到 BigQuery。然后,将在 Apache Spark 中计算聚合。

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_en = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10 AND wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

您可以修改上面的作业以添加表的缓存,现在,Wiki 列上的过滤器将由 Apache Spark 应用于内存。

import pyspark.sql.functions as F

table = "bigquery-public-data.wikipedia.pageviews_2020"

df_wiki_pageviews = spark.read \
 .format("bigquery") \
 .option("table", table) \
 .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
 .load()

df_wiki_all = df_wiki_pageviews \
 .select("title", "wiki", "views") \
 .where("views > 10")

# cache the data in memory
df_wiki_all.cache()

df_wiki_en = df_wiki_all \
 .where("wiki in ('en', 'en.m')")

df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_en_totals.orderBy('total_views', ascending=False)

然后,您可以使用缓存的数据过滤其他 Wiki 语言,而无需再次从 BigQuery 存储空间读取数据,因此运行速度更快。

df_wiki_de = df_wiki_all \
 .where("wiki in ('de', 'de.m')")

df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))

df_wiki_de_totals.orderBy('total_views', ascending=False)

您可以通过运行以下命令来移除缓存:

df_wiki_all.unpersist()

9. 适用于更多用例的示例笔记本

Cloud Dataproc GitHub 代码库包含采用常见 Apache Spark 模式的 Jupyter 笔记本,这些笔记本可以使用各种 Google Cloud Platform 产品和开源工具加载数据、保存数据以及绘制数据图表:

10. 清理

为避免在完成本快速入门后向您的 GCP 账号产生不必要的费用,请执行以下操作:

  1. 为环境删除 Cloud Storage 存储分区以及您创建的存储分区
  2. 删除 Dataproc 环境

如果您专门为此 Codelab 创建了一个项目,也可以选择删除该项目:

  1. 在 GCP Console 中,转到项目页面。
  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在框中输入项目 ID,然后点击关停以删除项目。

许可

此作品已获得知识共享署名 3.0 通用许可和 Apache 2.0 许可。