1. 概览
本实验将介绍如何在 Cloud Dataproc 上设置和使用 Apache Spark 和 Jupyter 笔记本。
Jupyter 笔记本广泛用于探索性数据分析和构建机器学习模型,因为您可以借助 Jupyter 笔记本以交互方式运行代码,并立即查看结果。
不过,设置和使用 Apache Spark 和 Jupyter 笔记本可能很复杂。
借助 Cloud Dataproc,您可以在大约 90 秒内创建包含 Apache Spark、Jupyter 组件和组件网关的 Dataproc 集群,从而快速而轻松地实现这一目的。
学习内容
在此 Codelab 中,您将学习如何:
- 为您的集群创建 Google Cloud Storage 存储分区
- 使用 Jupyter 和组件网关创建 Dataproc 集群。
- 访问 Dataproc 上的 JupyterLab 网页界面
- 创建使用 Spark BigQuery Storage 连接器的笔记本
- 运行 Spark 作业并绘制结果。
在 Google Cloud 上运行此实验的总费用约为 $1。如需详细了解 Cloud Dataproc 的价格,请点击此处。
2. 创建项目
访问 console.cloud.google.com 并登录 Google Cloud Platform 控制台并创建一个新项目:
接下来,您需要在 Cloud 控制台中启用结算功能,才能使用 Google Cloud 资源。
在此 Codelab 中运行仅花费几美元,但是如果您决定使用更多资源或继续让它们运行,费用可能更高。此 Codelab 的最后一部分将引导您清理项目。
Google Cloud Platform 的新用户有资格获享$300 免费试用。
3. 设置您的环境
首先,点击 Cloud 控制台右上角的按钮,打开 Cloud Shell:
Cloud Shell 加载后,运行以下命令以设置上一步中的项目 ID**:**
gcloud config set project <project_id>
您还可以点击 Cloud 控制台左上角的项目,找到项目 ID:
接下来,启用 Dataproc、Compute Engine 和 BigQuery Storage API。
gcloud services enable dataproc.googleapis.com \
compute.googleapis.com \
storage-component.googleapis.com \
bigquery.googleapis.com \
bigquerystorage.googleapis.com
您也可以在 Cloud 控制台中完成此操作。点击屏幕左上角的菜单图标。
从下拉菜单中选择“API 管理器”。
点击启用 API 和服务。
搜索并启用以下 API:
- Compute Engine API
- Dataproc API
- BigQuery API
- BigQuery Storage API
4. 创建 GCS 存储分区
在距离数据最近的区域创建一个 Google Cloud Storage 存储分区,并为其指定一个唯一的名称。
此名称将用于 Dataproc 集群。
REGION=us-central1
BUCKET_NAME=<your-bucket-name>
gsutil mb -c standard -l ${REGION} gs://${BUCKET_NAME}
您应该会看到以下输出内容
Creating gs://<your-bucket-name>/...
5. 使用 Jupyter 和组件网关
正在创建集群
为集群设置环境变量
REGION=us-central1
ZONE=us-central1-a
CLUSTER_NAME=spark-jupyter
BUCKET_NAME=<your-bucket-name>
然后,运行此 gcloud 命令以创建集群,其中包含在集群上使用 Jupyter 所需的所有组件。
gcloud beta dataproc clusters create ${CLUSTER_NAME} \
--region=${REGION} \
--image-version=1.4 \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--bucket=${BUCKET_NAME} \
--optional-components=ANACONDA,JUPYTER \
--enable-component-gateway
创建集群时,您应该会看到以下输出内容
Waiting on operation [projects/spark-jupyter/regions/us-central1/operations/abcd123456].
Waiting for cluster creation operation...
创建集群应该需要大约 90 秒的时间。当集群准备就绪后,您就可以通过 Dataproc Cloud 控制台界面访问集群了。
在等待期间,您可以继续继续阅读以下内容,详细了解 gcloud 命令中使用的标志。
创建集群后,您应该会看到以下输出:
Created [https://dataproc.googleapis.com/v1beta2/projects/project-id/regions/us-central1/clusters/spark-jupyter] Cluster placed in zone [us-central1-a].
gcloud dataproc create 命令中使用的标志
以下是 gcloud dataproc create 命令中使用的标志的明细
--region=${REGION}
指定要创建集群的区域和可用区。您可以点击此处查看适用区域列表。
--image-version=1.4
要在集群中使用的映像版本。您可以在此处查看可用版本的列表。
--bucket=${BUCKET_NAME}
指定您之前创建的用于集群的 Google Cloud Storage 存储分区。如果您不提供 GCS 存储分区,系统会为您创建该存储分区。
即使您删除了集群,您的笔记本也会保存到这里,因为 GCS 存储分区不会被删除。
--master-machine-type=n1-standard-4
--worker-machine-type=n1-standard-4
要用于您的 Dataproc 集群的机器类型。您可以在此处查看可用机器类型列表。
如果您未设置标志 -num-workers,则系统会默认创建 1 个主节点和 2 个工作器节点
--optional-components=ANACONDA,JUPYTER
为可选组件设置这些值可在集群上安装 Jupyter 和 Anaconda 的所有必要库(Jupyter 笔记本需要这些库)。
--enable-component-gateway
启用组件网关后,系统会通过 Apache Knox 和反向代理创建 App Engine 链接。借助该链接,用户可以轻松、安全且经过身份验证地访问 Jupyter 和 JupyterLab 网页界面,这意味着您无需再创建 SSH 隧道。
它还将创建指向集群上其他工具的链接,包括 Yarn Resource Manager 和 Spark History Server,它们对于查看作业的性能和集群使用模式非常有用。
6. 创建 Apache Spark 笔记本
访问 JupyterLab 网页界面
集群准备就绪后,您可以前往 Dataproc 集群 - Cloud 控制台,点击您创建的集群并进入“网页界面”标签页,从而找到指向 JupyterLab 网页界面的组件网关链接。
您会注意到,您可以访问 Jupyter(经典笔记本界面)或 JupyterLab(称为 Project Jupyter 的下一代界面)。
JupyterLab 提供了许多出色的全新界面功能,因此,如果您刚开始使用笔记本或想要寻求最新的改进,建议您使用 JupyterLab,因为根据官方文档,JupyterLab 最终将取代传统的 Jupyter 界面。
使用 Python 3 内核创建笔记本
在启动器标签页中,点击 Python 3 笔记本图标以创建具有 Python 3 内核(而非 PySpark 内核)的笔记本,这让您可以在笔记本中配置 SparkSession,并添加使用 BigQuery Storage API 所需的 spark-bigquery-connector。
重命名笔记本
右键点击左侧边栏中或顶部导航栏中的笔记本名称,然后将笔记本重命名为“BigQuery Storage &“Spark DataFrames.ipynb”
在笔记本中运行 Spark 代码
在此笔记本中,您将使用 spark-bigquery-connector,它使用 BigQuery Storage API 在 BigQuery 和 Spark 之间读取和写入数据。
通过使用基于 RPC 的协议,BigQuery Storage API 显著改进了在 BigQuery 中访问数据的方式。它支持并行数据读写以及不同的序列化格式,例如 Apache Avro 和 Apache Arrow。概括来讲,这可以显著提升性能,尤其是在处理较大的数据集时。
在第一个单元中,检查集群的 Scala 版本,以便您可以添加正确版本的 spark-bigquery-connector jar。
输入内容 [1]:
!scala -version
输出 [1]:创建一个 Spark 会话并包含 spark-bigquery-connector 软件包。
如果您的 Scala 版本为 2.11,请使用以下软件包。
com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta
如果您的 Scala 版本为 2.12,请使用以下软件包。
com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta
输入内容 [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('BigQuery Storage & Spark DataFrames') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.11:0.15.1-beta') \
.getOrCreate()
启用 repl.eagerEval
这将在每个步骤中输出 DataFrame 的结果,而无需显示 df.show(),并且还改进了输出的格式。
输入内容 [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
将 BigQuery 表读入 Spark DataFrame
通过读取公共 BigQuery 数据集中的数据来创建 Spark DataFrame。这使用 spark-bigquery-connector 和 BigQuery Storage API 将数据加载到 Spark 集群中。
创建 Spark DataFrame 并从用于获取维基百科网页浏览数据的 BigQuery 公共数据集加载数据。您会发现,您没有对数据运行查询,这是因为在使用 spark-bigquery-connector 将数据加载到 Spark 中进行处理时。当此代码运行时,它实际上不会加载表,因为这是 Spark 中的一项延迟评估,并将在下一步中执行。
输入内容 [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
输出 [4]:
选择所需的列,并使用 where()
(filter()
的别名)应用过滤条件。
运行此代码时,它会触发 Spark 操作,此时系统会从 BigQuery Storage 中读取数据。
输入内容 [5]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
df_wiki_en
输出 [5]:
按标题分组并按网页浏览量排序,即可查看热门网页
输入内容 [6]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
输出 [6]:
7. 在笔记本中使用 Python 绘图库
您可以使用 Python 中提供的各种绘制库来绘制 Spark 作业的输出。
将 Spark DataFrame 转换为 Pandas DataFrame
将 Spark DataFrame 转换为 Pandas DataFrame,并将 datehour 设置为索引。如果您想直接在 Python 中处理数据并使用许多可用的 Python 绘图库来绘制数据,这将非常有用。
输入内容 [7]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
输出 [7]:
绘制 Pandas Dataframe
导入在笔记本中显示图表所需的 matplotlib 库
输入内容 [8]:
import matplotlib.pyplot as plt
使用 Pandas 绘图函数根据 Pandas DataFrame 创建折线图。
输入内容 [9]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
输出 [9]:
检查笔记本是否保存在 GCS 中
现在,您应该已经在 Dataproc 集群上启动并运行了您的第一个 Jupyter 笔记本。为您的笔记本命名,它会自动保存到创建集群时使用的 GCS 存储分区。
您可以在 Cloud Shell 中使用此 gsutil 命令进行检查
BUCKET_NAME=<your-bucket-name>
gsutil ls gs://${BUCKET_NAME}/notebooks/jupyter
您应该会看到以下输出内容
gs://bucket-name/notebooks/jupyter/
gs://bucket-name/notebooks/jupyter/BigQuery Storage & Spark DataFrames.ipynb
8. 优化提示 - 在内存中缓存数据
在某些情况下,您可能希望将数据保存在内存中,而不是每次都从 BigQuery Storage 中读取数据。
此作业将从 BigQuery 读取数据,并将过滤条件推送到 BigQuery。然后,将在 Apache Spark 中计算聚合。
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_en = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10 AND wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
您可以修改上面的作业以添加表的缓存,现在,Wiki 列上的过滤器将由 Apache Spark 应用于内存。
import pyspark.sql.functions as F
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_all = df_wiki_pageviews \
.select("title", "wiki", "views") \
.where("views > 10")
# cache the data in memory
df_wiki_all.cache()
df_wiki_en = df_wiki_all \
.where("wiki in ('en', 'en.m')")
df_wiki_en_totals = df_wiki_en \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_en_totals.orderBy('total_views', ascending=False)
然后,您可以使用缓存的数据过滤其他 Wiki 语言,而无需再次从 BigQuery 存储空间读取数据,因此运行速度更快。
df_wiki_de = df_wiki_all \
.where("wiki in ('de', 'de.m')")
df_wiki_de_totals = df_wiki_de \
.groupBy("title") \
.agg(F.sum('views').alias('total_views'))
df_wiki_de_totals.orderBy('total_views', ascending=False)
您可以通过运行以下命令来移除缓存:
df_wiki_all.unpersist()
9. 适用于更多用例的示例笔记本
Cloud Dataproc GitHub 代码库包含采用常见 Apache Spark 模式的 Jupyter 笔记本,这些笔记本可以使用各种 Google Cloud Platform 产品和开源工具加载数据、保存数据以及绘制数据图表:
10. 清理
为避免在完成本快速入门后向您的 GCP 账号产生不必要的费用,请执行以下操作:
- 为环境删除 Cloud Storage 存储分区以及您创建的存储分区
- 删除 Dataproc 环境。
如果您专门为此 Codelab 创建了一个项目,也可以选择删除该项目:
- 在 GCP Console 中,转到项目页面。
- 在项目列表中,选择要删除的项目,然后点击删除。
- 在框中输入项目 ID,然后点击关停以删除项目。
许可
此作品已获得知识共享署名 3.0 通用许可和 Apache 2.0 许可。