将 Notebooks 与 Google Cloud Dataflow 搭配使用

1. 简介

Cloud-Dataflow.png

Google Cloud Dataflow

上次更新时间:2023 年 7 月 5 日

什么是 Dataflow?

Dataflow 是一种用于执行多种数据处理模式的托管式服务。本网站上的文档介绍了如何使用 Dataflow 部署批量数据处理流水线和流式数据处理流水线,包括各项服务功能的使用说明。

Apache Beam SDK 是一个开源编程模型,您可以使用它来开发批处理流水线和流处理流水线。您可以使用 Apache Beam 程序创建流水线,然后在 Dataflow 服务上运行这些流水线。Apache Beam 文档提供了有关 Apache Beam 编程模型、SDK 和其他运行程序的深入概念信息和参考资料。

快速进行流式数据分析

Dataflow 可实现快速、简化的流式数据流水线开发,且数据延迟时间更短。

简化运营和管理

Dataflow 的无服务器方法消除了数据工程工作负载的运营开销,让团队可以专注于编程,而无需管理服务器集群。

降低总拥有成本

资源自动扩缩功能搭配费用优化的批处理功能,使得 Dataflow 可提供几乎无限的容量来管理季节性和峰值工作负载,而不会超支。

主要元素

自动化资源管理和动态工作负载再平衡

Dataflow 可自动预配和管理处理资源,以尽可能缩短延迟时间并提高利用率,因此您无需手动启动或预留实例。工作分区也是自动执行的,并且经过了优化以动态再平衡进度滞后的工作。无需苦苦寻找“热点键”或预处理输入数据。

横向自动扩缩

横向自动扩缩工作器资源规模以实现最佳的吞吐量,从而获得更好的整体性价比。

面向批处理的灵活资源调度价格

对于调度时间较为灵活的处理作业(例如夜间作业),采用灵活资源调度 (FlexRS) 模式进行批处理可以享受更低的价格。系统会将这些灵活作业放入队列中,并保证在六小时内检索并执行这些作业。

本次变更包含的内容

借助 Apache Beam 交互式运行程序和 JruyterLab 笔记本,您可以迭代地开发流水线、检查流水线图,并在“读取-求值-输出”循环 (REPL) 工作流中解析独立的 PCollection。这些 Apache Beam 笔记本通过 Vertex AI Workbench 提供,这是一项代管式服务,用于托管预安装了最新数据科学和机器学习框架的笔记本虚拟机。

此 Codelab 重点介绍 Apache Beam 笔记本引入的功能。

学习内容

  • 如何创建笔记本实例
  • 创建基本流水线
  • 从无界限来源读取数据
  • 直观显示数据
  • 从笔记本启动 Dataflow 作业
  • 保存笔记本

所需条件

  • 启用了结算功能的 Google Cloud Platform 项目。
  • 已启用 Google Cloud Dataflow 和 Google Cloud PubSub。

2. 准备工作

  1. 在 Cloud 控制台的项目选择器页面上,选择或创建 Cloud 项目。

确保您已启用以下 API:

  • Dataflow API
  • Cloud Pub/Sub API
  • Compute Engine
  • Notebooks API

您可以通过查看 API 的 &服务页面。

在本指南中,我们将读取 Pub/Sub 订阅中的数据,因此请确保 Compute Engine 默认服务账号具有 Editor 角色,或向其授予 Pub/Sub Editor 角色。

3. Apache Beam 笔记本使用入门

启动 Apache Beam 笔记本实例

  1. 在控制台上启动 Dataflow:

  1. 使用左侧菜单选择 Workbench 页面。
  2. 确保您位于用户管理的笔记本标签页上。
  3. 在工具栏中,点击新建笔记本
  4. 选择 Apache Beam > Without GPUs
  5. 新建笔记本页面上,为笔记本虚拟机选择一个子网,然后点击创建
  6. 链接生效后,点击打开 JupyterLab。Vertex AI Workbench 会创建一个新的 Apache Beam 笔记本实例。

4. 创建流水线

创建笔记本实例

导航到文件 >新建 >Notebook 中,并选择 Apache Beam 2.47 或更高版本的内核。

开始将代码添加到笔记本

  • 复制每个部分中的代码并将其粘贴到笔记本的新单元格中
  • 运行单元

6bd3dd86cc7cf802

通过将 Apache Beam 交互式运行程序与 JupyterLab 笔记本搭配使用,您可以迭代地开发流水线、检查流水线图,并在“读取-求值-输出”循环 (REPL) 工作流中解析各个 PCollection。

Apache Beam 安装到您的笔记本实例后,在笔记本中加入 interactive_runnerinteractive_beam 模块。

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

如果您的笔记本使用其他 Google 服务,请添加以下 import 语句:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

设置互动选项

以下代码将数据捕获持续时间设置为 60 秒。如果您想加快迭代速度,请缩短时长,例如“10s”。

ib.options.recording_duration = '60s'

如需了解其他交互选项,请参阅 interactive_beam.options 类

使用 InteractiveRunner 对象初始化流水线。

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

读取和直观呈现数据

以下示例展示了一个 Apache Beam 流水线,该流水线创建针对指定 Pub/Sub 主题的订阅并从该订阅中读取数据。

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

流水线按窗口对来源的字词进行统计。它会创建固定窗口,每个窗口的时长均为 10 秒。

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

在将数据划分窗口后,系统会按窗口统计字词。

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

直观呈现数据

show() 方法会直观呈现笔记本中生成的 PCollection。

ib.show(windowed_word_counts, include_window_info=True)

以表格形式直观呈现 PCollection 的 show 方法。

如需以可视化方式显示数据,请将 visualize_data=True 传入 show() 方法。添加新单元格:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

您可以对可视化数据应用多个过滤条件。以下可视化数据允许您按标签和坐标轴进行过滤:

以丰富的可过滤界面元素直观呈现 PCollection 的 show 方法。

5. 使用 Pandas Dataframe

Apache Beam 笔记本中另一个实用的可视化工具是 Pandas DataFrame。以下示例首先将字词转换为小写,然后计算每个字词的频率。

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

collect() 方法在 Pandas DataFrame 中提供输出。

ib.collect(windowed_lower_word_counts, include_window_info=True)

在 Pandas DataFrame 中展示 PCollection 的 collect 方法。

6. (可选)从笔记本启动 Dataflow 作业

  1. 如需在 Dataflow 上运行作业,您需要额外的权限。确保 Compute Engine 默认服务账号具有 Editor 角色,或者为其授予以下 IAM 角色:
  • Dataflow Admin
  • Dataflow Worker
  • Storage Admin 和
  • 服务账号用户 (roles/iam.serviceAccountUser)

如需详细了解角色,请参阅文档

  1. (可选)在使用笔记本运行 Dataflow 作业之前,请重启内核、重新运行所有单元并验证输出。
  2. 移除以下 import 语句:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. 添加以下 import 语句:
from apache_beam.runners import DataflowRunner
  1. 移除以下录制时长选项:
ib.options.recording_duration = '60s'
  1. 将以下内容添加到流水线选项中。您需要调整 Cloud Storage 位置以指向您已拥有的存储分区,也可以为此创建新的存储分区。您还可以从 us-central1 更改区域值。
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. beam.Pipeline() 的构造函数中,将 InteractiveRunner 替换为 DataflowRunnerp 是创建流水线时的流水线对象。
p = beam.Pipeline(DataflowRunner(), options=options)
  1. 从代码中移除交互式调用。例如,从代码中移除 show()collect()head()show_graph()watch()
  2. 若要查看任何结果,您需要添加接收器。在上一部分中,我们直观呈现笔记本中的结果,但这一次,我们在此笔记本之外(在 Dataflow 中)运行作业。因此,我们需要一个外部位置来存储我们的结果。在此示例中,我们会将结果写入 GCS (Google Cloud Storage) 中的文本文件。由于这是一个流处理流水线,具有数据窗口,因此我们需要为每个窗口创建一个文本文件。为此,请将以下步骤添加到您的流水线中:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. 在流水线代码末尾添加 p.run()
  2. 现在,请查看您的笔记本代码,确认您已纳入所有更改。保存后的结果应类似于下图:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. 运行单元。
  2. 您应该会看到类似于以下内容的输出:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. 如需验证作业是否正在运行,请转到 Dataflow 的“作业”页面。您应该会在列表中看到一条新招聘信息。该作业大约需要 5-10 分钟来开始处理数据。
  2. 数据处理完毕后,前往 Cloud Storage,然后导航到 Dataflow 存储结果的目录(您定义的 output_gcs_location)。您应该会看到一个文本文件列表,每个窗口都有一个文件。bfcc5ce9e46a8b14.png
  3. 下载文件并检查内容。它应包含与其计数配对的字词列表。或者,使用命令行界面检查文件。为此,您可以在笔记本的新单元中运行以下命令:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. 您将看到类似于以下内容的输出:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. 大功告成!别忘了清理并停止已创建的作业(请参阅此 Codelab 的最后一步)。

如需查看如何在交互式笔记本上执行此转换的示例,请参阅笔记本实例中的 Dataflow Word Count 笔记本。

或者,您可以将笔记本导出为可执行脚本,按照上述步骤修改生成的 .py 文件,然后将流水线部署到 Dataflow 服务。

7. 保存笔记本

您创建的笔记本将本地保存到正在运行的笔记本实例中。如果您在开发期间重置或关停笔记本实例,则只要在 /home/jupyter 目录下创建这些新笔记本,它们就会保留。但是,如果删除笔记本实例,则这些笔记本也会被删除。

如需保留笔记本以供日后使用,请将它们本地下载到您的工作站、将其保存到 GitHub 或以其他文件格式导出。

8. 正在清理

使用完 Apache Beam 笔记本实例后,请关停笔记本实例停止流式作业(如果已运行),以清理您在 Google Cloud 上创建的资源。

或者,如果您专门为此 Codelab 创建了项目,也可以完全关停项目