1. 概览

什么是 Dataflow?
Dataflow 是一种用于执行各种数据处理模式的托管式服务。本网站上提供的文档介绍如何使用 Dataflow 部署批量数据处理流水线和流式数据处理流水线,其中包括各项服务功能的使用说明。
Apache Beam SDK 是一个开源编程模型,既可用于开发批处理流水线,又可用于开发流处理流水线。您可以使用 Apache Beam 程序创建流水线,然后在 Dataflow 服务上运行这些流水线。Apache Beam 文档提供了有关 Apache Beam 编程模型、SDK 和其他运行程序的深入概念性信息和参考资料。
快速进行流式数据分析
Dataflow 可实现快速、简化的流式数据流水线开发,且数据延迟时间更短。
简化运营和管理
Dataflow 的无服务器方法消除了数据工程工作负载的运营开销,让团队可以专注于编程,而不必管理服务器集群。
降低总拥有成本
资源自动扩缩功能搭配费用优化的批处理功能,使得 Dataflow 可提供几乎无限的容量来管理季节性和峰值工作负载,而不会让您过度开支。
主要功能
自动化资源管理和动态工作负载再平衡
Dataflow 自动执行处理资源的预配和管理,最大限度地缩短延迟时间并提高利用率,因而您无需再手动启动或预留实例。工作分区也是自动执行的,并且经过了优化以便动态再平衡进度滞后的工作。您无需再苦苦寻找“热点键”或对输入数据进行预处理。
横向自动扩缩
横向自动扩缩工作器资源规模以达到最佳的吞吐量,从而实现了更高的整体性价比。
面向批处理的灵活资源调度价格
对于调度时间较为灵活的处理作业(例如夜间作业),采用灵活资源调度 (FlexRS) 方式进行批处理可以享受更低的价格。系统会将这些灵活作业放入队列中,并保证在六小时内检索并执行这些作业。
本教程改编自 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven
学习内容
- 如何使用 Java SDK 通过 Apache Beam 创建 Maven 项目
- 使用 Google Cloud Platform 控制台运行示例流水线
- 如何删除关联的 Cloud Storage 存储分区及其内容
所需条件
您打算如何使用本教程?
您如何评价自己在使用 Google Cloud Platform 服务方面的经验水平?
2. 设置和要求
自定进度的环境设置
请记住项目 ID,它在所有 Google Cloud 项目中都是唯一的名称(上述名称已被占用,您无法使用,抱歉!)。它稍后将在此 Codelab 中被称为 PROJECT_ID。
- 接下来,您需要在 Cloud 控制台中启用结算功能,才能使用 Google Cloud 资源。
运行此 Codelab 应该不会产生太多的费用(如果有费用的话)。请务必按照“清理”部分中的所有说明操作,该部分介绍了如何关停资源,以免产生超出本教程范围的结算费用。Google Cloud 的新用户符合参与 $300 USD 免费试用计划的条件。
启用 API
点击屏幕左上角的菜单图标。

从下拉菜单中选择 API 和服务 > 信息中心。

选择 + 启用 API 和服务。

在搜索框中搜索“Compute Engine”。在显示的搜索结果列表中,点击“Compute Engine API”。

在 Google Compute Engine 页面上,点击启用

启用后,点击箭头返回。
现在,搜索以下 API 并启用它们:
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- Cloud Storage JSON
- BigQuery
- Cloud Pub/Sub
- Cloud Datastore
- Cloud Resource Manager API
3. 创建新的 Cloud Storage 存储分区
在 Google Cloud Platform 控制台中,点击屏幕左上角的菜单图标:

在存储子部分中,向下滚动并选择 Cloud Storage > 浏览器:

您现在应该会看到 Cloud Storage 浏览器,并且假设您使用的是目前没有任何 Cloud Storage 存储分区的项目,您会看到创建新存储分区的邀请。按创建分桶按钮以创建一个分桶:

为存储分区输入名称。如对话框中所述,存储分区名称在整个 Cloud Storage 中必须具有唯一性。因此,如果您选择一个显而易见的名称(例如“test”),您可能会发现其他人已经创建了具有该名称的存储分区,并会收到错误消息。
此外,对于存储分区名称中允许使用的字符,也有一些规则。如果您的存储分区名称以字母或数字开头和结尾,并且中间只使用短划线,则不会有问题。如果您尝试使用特殊字符,或者尝试以字母或数字以外的字符开头或结尾来命名存储分区,系统会在对话框中提醒您相关规则。

为您的存储分区输入一个唯一的名称,然后按创建。如果您选择的名称已被使用,系统会显示上述错误消息。成功创建存储分区后,系统会将您带到浏览器中新创建的空存储分区:

您看到的存储分区名称当然会有所不同,因为存储分区名称在所有项目中必须是唯一的。
4. 启动 Cloud Shell
激活 Cloud Shell
- 在 Cloud Console 中,点击激活 Cloud Shell
。
如果您以前从未启动过 Cloud Shell,将看到一个中间屏幕(在折叠下面),描述它是什么。如果是这种情况,请点击继续(您将永远不会再看到它)。一次性屏幕如下所示:
预配和连接到 Cloud Shell 只需花几分钟时间。
这个虚拟机已加载了您需要的所有开发工具。它提供了一个持久的 5GB 主目录,并且在 Google Cloud 中运行,大大增强了网络性能和身份验证。只需使用一个浏览器或 Google Chromebook 即可完成本 Codelab 中的大部分(甚至全部)工作。
在连接到 Cloud Shell 后,您应该会看到自己已通过身份验证,并且相关项目已设置为您的项目 ID:
- 在 Cloud Shell 中运行以下命令以确认您已通过身份验证:
gcloud auth list
命令输出
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
gcloud config list project
命令输出
[core] project = <PROJECT_ID>
如果不是上述结果,您可以使用以下命令进行设置:
gcloud config set project <PROJECT_ID>
命令输出
Updated property [core/project].
5. 创建 Maven 项目
启动 Cloud Shell 后,我们先使用 Java 版 Apache Beam SDK 创建一个 Maven 项目。
Apache Beam 是一款用于数据流水线的开源编程模型。您可以使用 Apache Beam 程序定义这些流水线,并且可以选择运行程序(如 Dataflow)来执行流水线。
在 shell 中运行 mvn archetype:generate 命令,如下所示:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
运行该命令后,您应该会在当前目录下看到一个名为 first-dataflow 的新目录。first-dataflow 包含一个 Maven 项目,其中包含 Java 版 Cloud Dataflow SDK 和示例流水线。
6. 在 Cloud Dataflow 上运行文本处理流水线
我们先将项目 ID 和 Cloud Storage 存储分区名称保存为环境变量。您可以在 Cloud Shell 中执行此操作。请务必将 <your_project_id> 替换为您自己的项目 ID。
export PROJECT_ID=<your_project_id>
现在,我们将对 Cloud Storage 存储分区执行相同的操作。请注意,您需要将 <your_bucket_name> 替换为您在之前的步骤中用于创建存储分区的唯一名称。
export BUCKET_NAME=<your_bucket_name>
切换到 first-dataflow/ 目录:
cd first-dataflow
我们将运行一个名为 WordCount 的流水线,该流水线可读取文本、将文本行标记化为单个词,并对其中的每个词执行词频计数。首先,我们将运行流水线,并在流水线运行时查看每个步骤中发生的情况。
在 shell 或终端窗口中运行 mvn compile exec:java 命令,以启动流水线。对于 --project, --stagingLocation, 和 --output 实参,以下命令引用了您在此步骤中之前设置的环境变量。
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
在作业运行期间,我们来在作业列表中找到该作业。
在 Google Cloud Platform Console 中打开 Cloud Dataflow 网页界面。您会发现,您的 wordcount 任务的状态是 Running(正在运行):

现在,我们来看看流水线参数。首先,点击您的作业名称:

选择作业后,您可以查看执行图。在流水线的执行图中,每个方框表示流水线中的一个转换,其中包含转换名称和一些状态信息。您可以点击每个步骤右上角的尖角来查看更多详细信息:

我们来看看流水线如何在每个步骤中转换数据:
- 读取:在此步骤中,流水线从输入源读取数据。在本例中,该文件是 Cloud Storage 中的一个文本文件,其中包含莎士比亚戏剧《李尔王》的完整文本。我们的流水线会逐行读取文件,并输出一个
PCollection,其中文本文件中的每一行都是集合中的一个元素。 - CountWords:
CountWords步骤包含两个部分。首先,它使用名为ExtractWords的并行 do 函数 (ParDo) 将每行标记化为单个字词。ExtractWords 的输出是一个新的 PCollection,其中的每个元素都是一个字词。下一步Count利用 Java SDK 提供的一种转换,该转换会返回键值对,其中键是唯一字词,值是该字词的出现次数。以下是实现CountWords的方法,您可以在 GitHub 上查看完整的 WordCount.java 文件:
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements:此函数会调用下面的
FormatAsTextFn,该函数会将每个键值对格式化为可打印的字符串。
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts:在此步骤中,我们将可打印的字符串写入多个分片文本文件。
我们将在几分钟后查看流水线的输出结果。
现在,我们来看看图表右侧的作业信息页面,其中包含我们在 mvn compile exec:java 命令中添加的流水线参数。


您还可以看到流水线的自定义计数器,在本例中,该计数器显示了执行期间到目前为止遇到的空行数。您可以向流水线添加新的计数器,以便跟踪特定于应用的指标。

您可以点击控制台底部的日志图标,查看具体的错误消息。

该面板默认显示报告作业的整体状态的作业日志消息。您可以使用“最低严重级别”选择器,按作业进度和状态消息进行过滤。

选择图表中的流水线步骤可将视图更改为由您的代码生成的日志,以及在流水线步骤中运行的生成的代码。
如需返回“作业日志”,请点击图表外部区域或使用右侧侧边栏中的“关闭”按钮来取消选中相应步骤。
您可以使用日志标签页中的工作器日志按钮查看运行流水线的 Compute Engine 实例的工作器日志。工作器日志包含由您的代码和运行该代码的 Dataflow 生成的代码生成的日志行。
如果您尝试调试流水线中的失败,通常会在工作器日志中找到有助于解决问题的其他日志记录。请注意,这些日志是所有工作器的汇总日志,可以进行过滤和搜索。

Dataflow 监控界面仅显示最近的日志消息。点击日志窗格右侧的“Google Cloud Observability”链接,即可查看所有日志。

下面汇总了可从“监控”→“日志”页面查看的不同日志类型:
- job-message 日志包含 Dataflow 的各个组件生成的作业级消息。相关示例包括自动扩缩配置、工作器启动或关停的时间、作业步骤进度以及作业错误。源自崩溃用户代码以及 worker 日志中出现的工作器级错误也会记录到 job-message 日志中。
- worker 日志由 Dataflow 工作器生成。工作器负责完成大多数流水线工作(例如将 ParDo 应用于数据)。Worker 日志包含由您的代码和 Dataflow 记录的消息。
- worker-startup 日志存在于大多数 Dataflow 作业中,可捕获与启动过程相关的消息。启动过程包括从 Cloud Storage 下载作业的 JAR 文件,然后启动工作器。如果启动工作器时出现问题,则建议查看此日志。
- shuffler 日志中包含的消息来自负责整合并行流水线操作结果的工作器。
- docker 和 kubelet 日志包含与在 Dataflow 工作器上使用的这些公开技术相关的消息。
在下一步中,我们将检查您的作业是否成功运行。
7. 检查作业是否成功运行
在 Google Cloud Platform Console 中打开 Cloud Dataflow 网页界面。
您应会看到 wordcount 作业的状态最初为正在运行,然后变为成功:

该作业大约需要 3-4 分钟才能运行完毕。
还记得您运行流水线并指定输出存储分区时的情景吗?让我们来看看结果(因为您肯定想知道《李尔王》中每个字词出现的次数!)。返回到 Google Cloud Platform 控制台中的 Cloud Storage 浏览器。在您的存储分区中,您应该会看到作业所创建的输出文件和暂存文件:

8. 关闭资源
您可以通过 Google Cloud Platform Console 关闭资源。
在 Google Cloud Platform Console 中打开 Cloud Storage 浏览器。

选中您创建的存储分区旁边的复选框,然后点击删除以永久删除该存储分区及其内容。


9. 恭喜!
您已了解如何使用 Cloud Dataflow SDK 创建 Maven 项目、使用 Google Cloud Platform 控制台运行示例流水线,以及删除关联的 Cloud Storage 存储分区及其内容。
了解详情
- Dataflow 文档:https://cloud.google.com/dataflow/docs/
许可
此作品已获得 知识共享署名 3.0 通用许可和 Apache 2.0 许可授权。