将 Notebooks 与 Google Cloud Dataflow 搭配使用

1. 简介


Google Cloud Dataflow

上次更新时间:2023 年 7 月 5 日

什么是 Dataflow?

Dataflow 是一种用于执行多种数据处理模式的托管式服务。本网站上的文档介绍了如何使用 Dataflow 部署批量数据处理流水线和流式数据处理流水线,包括各项服务功能的使用说明。

Apache Beam SDK 是一个开源编程模型,您可以使用它来开发批处理流水线和流处理流水线。您可以使用 Apache Beam 程序创建流水线,然后在 Dataflow 服务上运行这些流水线。Apache Beam 文档提供了有关 Apache Beam 编程模型、SDK 和其他运行程序的深入概念信息和参考资料。


Dataflow 可实现快速、简化的流式数据流水线开发,且数据延迟时间更短。


Dataflow 的无服务器方法消除了数据工程工作负载的运营开销,让团队可以专注于编程,而无需管理服务器集群。


资源自动扩缩功能搭配费用优化的批处理功能,使得 Dataflow 可提供几乎无限的容量来管理季节性和峰值工作负载,而不会超支。



Dataflow 可自动预配和管理处理资源,以尽可能缩短延迟时间并提高利用率,因此您无需手动启动或预留实例。工作分区也是自动执行的,并且经过了优化以动态再平衡进度滞后的工作。无需苦苦寻找“热点键”或预处理输入数据。




对于调度时间较为灵活的处理作业(例如夜间作业),采用灵活资源调度 (FlexRS) 模式进行批处理可以享受更低的价格。系统会将这些灵活作业放入队列中,并保证在六小时内检索并执行这些作业。


借助 Apache Beam 交互式运行程序和 JruyterLab 笔记本,您可以迭代地开发流水线、检查流水线图,并在“读取-求值-输出”循环 (REPL) 工作流中解析独立的 PCollection。这些 Apache Beam 笔记本通过 Vertex AI Workbench 提供,这是一项代管式服务,用于托管预安装了最新数据科学和机器学习框架的笔记本虚拟机。

此 Codelab 重点介绍 Apache Beam 笔记本引入的功能。


  • 如何创建笔记本实例
  • 创建基本流水线
  • 从无界限来源读取数据
  • 直观显示数据
  • 从笔记本启动 Dataflow 作业
  • 保存笔记本


  • 启用了结算功能的 Google Cloud Platform 项目。
  • 已启用 Google Cloud Dataflow 和 Google Cloud PubSub。

2. 准备工作

  1. 在 Cloud 控制台的项目选择器页面上,选择或创建 Cloud 项目。

确保您已启用以下 API:

  • Dataflow API
  • Cloud Pub/Sub API
  • Compute Engine
  • Notebooks API

您可以通过查看 API 的 &服务页面。

在本指南中,我们将读取 Pub/Sub 订阅中的数据,因此请确保 Compute Engine 默认服务账号具有 Editor 角色,或向其授予 Pub/Sub Editor 角色。

3. Apache Beam 笔记本使用入门

启动 Apache Beam 笔记本实例

  1. 在控制台上启动 Dataflow:

  1. 使用左侧菜单选择 Workbench 页面。
  2. 确保您位于用户管理的笔记本标签页上。
  3. 在工具栏中,点击新建笔记本
  4. 选择 Apache Beam > Without GPUs
  5. 新建笔记本页面上,为笔记本虚拟机选择一个子网,然后点击创建
  6. 链接生效后,点击打开 JupyterLab。Vertex AI Workbench 会创建一个新的 Apache Beam 笔记本实例。

4. 创建流水线


导航到文件 >新建 >Notebook 中,并选择 Apache Beam 2.47 或更高版本的内核。


  • 复制每个部分中的代码并将其粘贴到笔记本的新单元格中
  • 运行单元


通过将 Apache Beam 交互式运行程序与 JupyterLab 笔记本搭配使用,您可以迭代地开发流水线、检查流水线图,并在“读取-求值-输出”循环 (REPL) 工作流中解析各个 PCollection。

Apache Beam 安装到您的笔记本实例后,在笔记本中加入 interactive_runnerinteractive_beam 模块。

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

如果您的笔记本使用其他 Google 服务,请添加以下 import 语句:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth


以下代码将数据捕获持续时间设置为 60 秒。如果您想加快迭代速度,请缩短时长,例如“10s”。

ib.options.recording_duration = '60s'

如需了解其他交互选项,请参阅 interactive_beam.options 类

使用 InteractiveRunner 对象初始化流水线。

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)


以下示例展示了一个 Apache Beam 流水线,该流水线创建针对指定 Pub/Sub 主题的订阅并从该订阅中读取数据。

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

流水线按窗口对来源的字词进行统计。它会创建固定窗口,每个窗口的时长均为 10 秒。

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))


windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())


show() 方法会直观呈现笔记本中生成的 PCollection。

ib.show(windowed_word_counts, include_window_info=True)

以表格形式直观呈现 PCollection 的 show 方法。

如需以可视化方式显示数据,请将 visualize_data=True 传入 show() 方法。添加新单元格:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)


以丰富的可过滤界面元素直观呈现 PCollection 的 show 方法。

5. 使用 Pandas Dataframe

Apache Beam 笔记本中另一个实用的可视化工具是 Pandas DataFrame。以下示例首先将字词转换为小写,然后计算每个字词的频率。

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

collect() 方法在 Pandas DataFrame 中提供输出。

ib.collect(windowed_lower_word_counts, include_window_info=True)

在 Pandas DataFrame 中展示 PCollection 的 collect 方法。

6. (可选)从笔记本启动 Dataflow 作业

  1. 如需在 Dataflow 上运行作业,您需要额外的权限。确保 Compute Engine 默认服务账号具有 Editor 角色,或者为其授予以下 IAM 角色:
  • Dataflow Admin
  • Dataflow Worker
  • Storage Admin 和
  • 服务账号用户 (roles/iam.serviceAccountUser)


  1. (可选)在使用笔记本运行 Dataflow 作业之前,请重启内核、重新运行所有单元并验证输出。
  2. 移除以下 import 语句:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. 添加以下 import 语句:
from apache_beam.runners import DataflowRunner
  1. 移除以下录制时长选项:
ib.options.recording_duration = '60s'
  1. 将以下内容添加到流水线选项中。您需要调整 Cloud Storage 位置以指向您已拥有的存储分区,也可以为此创建新的存储分区。您还可以从 us-central1 更改区域值。
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location

# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. beam.Pipeline() 的构造函数中,将 InteractiveRunner 替换为 DataflowRunnerp 是创建流水线时的流水线对象。
p = beam.Pipeline(DataflowRunner(), options=options)
  1. 从代码中移除交互式调用。例如,从代码中移除 show()collect()head()show_graph()watch()
  2. 若要查看任何结果,您需要添加接收器。在上一部分中,我们直观呈现笔记本中的结果,但这一次,我们在此笔记本之外(在 Dataflow 中)运行作业。因此,我们需要一个外部位置来存储我们的结果。在此示例中,我们会将结果写入 GCS (Google Cloud Storage) 中的文本文件。由于这是一个流处理流水线,具有数据窗口,因此我们需要为每个窗口创建一个文本文件。为此,请将以下步骤添加到您的流水线中:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. 在流水线代码末尾添加 p.run()
  2. 现在,请查看您的笔记本代码,确认您已纳入所有更改。保存后的结果应类似于下图:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location

# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

  1. 运行单元。
  2. 您应该会看到类似于以下内容的输出:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. 如需验证作业是否正在运行,请转到 Dataflow 的“作业”页面。您应该会在列表中看到一条新招聘信息。该作业大约需要 5-10 分钟来开始处理数据。
  2. 数据处理完毕后,前往 Cloud Storage,然后导航到 Dataflow 存储结果的目录(您定义的 output_gcs_location)。您应该会看到一个文本文件列表,每个窗口都有一个文件。bfcc5ce9e46a8b14.png
  3. 下载文件并检查内容。它应包含与其计数配对的字词列表。或者,使用命令行界面检查文件。为此,您可以在笔记本的新单元中运行以下命令:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. 您将看到类似于以下内容的输出:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. 大功告成!别忘了清理并停止已创建的作业(请参阅此 Codelab 的最后一步)。

如需查看如何在交互式笔记本上执行此转换的示例,请参阅笔记本实例中的 Dataflow Word Count 笔记本。

或者,您可以将笔记本导出为可执行脚本,按照上述步骤修改生成的 .py 文件,然后将流水线部署到 Dataflow 服务。

7. 保存笔记本

您创建的笔记本将本地保存到正在运行的笔记本实例中。如果您在开发期间重置或关停笔记本实例,则只要在 /home/jupyter 目录下创建这些新笔记本,它们就会保留。但是,如果删除笔记本实例,则这些笔记本也会被删除。

如需保留笔记本以供日后使用,请将它们本地下载到您的工作站、将其保存到 GitHub 或以其他文件格式导出。

8. 正在清理

使用完 Apache Beam 笔记本实例后,请关停笔记本实例停止流式作业(如果已运行),以清理您在 Google Cloud 上创建的资源。

或者,如果您专门为此 Codelab 创建了项目,也可以完全关停项目