在 Cloud Dataflow 中运行大数据文本处理流水线

1. 概览

Cloud-Dataflow.png

什么是 Dataflow?

Dataflow 是一种用于执行多种数据处理模式的托管式服务。本网站上的文档介绍了如何使用 Dataflow 部署批量数据处理流水线和流式数据处理流水线,包括各项服务功能的使用说明。

Apache Beam SDK 是一个开源编程模型,您可以使用它来开发批处理流水线和流处理流水线。您可以使用 Apache Beam 程序创建流水线,然后在 Dataflow 服务上运行这些流水线。Apache Beam 文档提供了有关 Apache Beam 编程模型、SDK 和其他运行程序的深入概念信息和参考资料。

快速进行流式数据分析

Dataflow 可实现快速、简化的流式数据流水线开发,且数据延迟时间更短。

简化运营和管理

Dataflow 的无服务器方法消除了数据工程工作负载的运营开销,让团队可以专注于编程,而无需管理服务器集群。

降低总拥有成本

资源自动扩缩功能搭配费用优化的批处理功能,使得 Dataflow 可提供几乎无限的容量来管理季节性和峰值工作负载,而不会超支。

主要元素

自动化资源管理和动态工作负载再平衡

Dataflow 可自动预配和管理处理资源,以尽可能缩短延迟时间并提高利用率,因此您无需手动启动或预留实例。工作分区也是自动执行的,并且经过了优化以动态再平衡进度滞后的工作。无需苦苦寻找“热点键”或预处理输入数据。

横向自动扩缩

横向自动扩缩工作器资源规模以实现最佳的吞吐量,从而获得更好的整体性价比。

面向批处理的灵活资源调度价格

对于调度时间较为灵活的处理作业(例如夜间作业),采用灵活资源调度 (FlexRS) 模式进行批处理可以享受更低的价格。系统会将这些灵活作业放入队列中,并保证在六小时内检索并执行这些作业。

本教程改编自 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven

学习内容

  • 如何使用 Java SDK 通过 Apache Beam 创建 Maven 项目
  • 使用 Google Cloud Platform Console 运行示例流水线
  • 如何删除关联的 Cloud Storage 存储分区及其内容

所需条件

您打算如何使用本教程?

仅阅读教程内容 阅读并完成练习

您如何评价自己在使用 Google Cloud Platform 服务方面的经验水平?

<ph type="x-smartling-placeholder"></ph> 新手 中级 熟练

2. 设置和要求

自定进度的环境设置

  1. 登录 Cloud 控制台,然后创建一个新项目或重复使用现有项目。 (如果您还没有 Gmail 或 G Suite 账号,则必须创建一个。)

dMbN6g9RawQj_VXCSYpdYncY-DbaRzr2GbnwoV7jFf1u3avxJtmGPmKpMYgiaMH-qu80a_NJ9p2IIXFppYk8x3wyymZXavjglNLJJhuXieCem56H30hwXtd8PvXGpXJO9gEUDu3cZw

ci9Oe6PgnbNuSYlMyvbXF1JdQyiHoEgnhl4PlV_MFagm2ppzhueRkqX4eLjJllZco_2zCp0V0bpTupUSKji9KkQyWqj11pqit1K1faS1V6aFxLGQdkuzGp4rsQTan7F01iePL5DtqQ

8-tA_Lheyo8SscAVKrGii2coplQp2_D1Iosb2ViABY0UUO1A8cimXUu6Wf1R9zJIRExL5OB2j946aIiFtyKTzxDcNnuznmR45vZ2HMoK3o67jxuoUJCAnqvEX6NgPGFjCVNgASc-lg

请记住项目 ID,它在所有 Google Cloud 项目中都是唯一的名称(上述名称已被占用,您无法使用,抱歉!)。它稍后将在此 Codelab 中被称为 PROJECT_ID

  1. 接下来,您需要在 Cloud 控制台中启用结算功能,才能使用 Google Cloud 资源。

运行此 Codelab 应该不会产生太多的费用(如果有费用的话)。请务必按照“清理”部分部分,其中会指导您如何关停资源,以免产生超出本教程范围的结算费用。Google Cloud 的新用户符合参与 $300 USD 免费试用计划的条件。

启用 API

点击屏幕左上角的菜单图标。

2bfc27ef9ba2ec7d

选择 API 和服务 >信息中心

5b65523a6cc0afa6

选择 + 启用 API 和服务

81ed72192c0edd96

搜索“Compute Engine”。点击“Compute Engine API”。

3f201e991c7b4527

在 Google Compute Engine 页面上,点击启用

ac121653277fa7bb.png

启用后,点击箭头即可返回。

现在,搜索以下 API 并启用它们:

  • Cloud Dataflow
  • Stackdriver
  • Cloud Storage
  • Cloud Storage JSON
  • BigQuery
  • Cloud Pub/Sub
  • Cloud Datastore
  • Cloud Resource Manager API

3. 创建新的 Cloud Storage 存储桶

Google Cloud Platform Console 中,点击屏幕左上角的菜单图标:

2bfc27ef9ba2ec7d

向下滚动并选择 Cloud Storage >存储 子部分中的浏览器

2b6c3a2a92b47015

您现在应该会看到 Cloud Storage 浏览器,并且假设您使用的项目当前没有任何 Cloud Storage 存储分区,您将看到创建新存储分区的邀请。按创建存储分区按钮创建一个:

a711016d5a99dc37.png

为存储分区输入名称。如对话框所述,存储分区名称在整个 Cloud Storage 中必须是唯一的。因此,如果您选择很明显的名称(例如“test”),您可能会发现其他人已经使用该名称创建了存储分区,并且会收到错误消息。

此外,我们还有一些规则规定了存储分区名称中允许使用哪些字符。如果存储分区名称的开头和结尾都使用字母或数字,且中间只用破折号,那么没有问题。如果您尝试使用特殊字符,或尝试使用字母或数字以外的内容作为存储分区名称的开头或结尾,则对话框会提醒您注意规则。

3a5458648cfe3358

为您的存储分区输入一个唯一名称,然后按创建。如果您选择的内容已被使用,则会看到上方所示的错误消息。成功创建存储分区后,您将在浏览器中进入这个空的新存储分区:

3bda986ae88c4e71

当然,您看到的存储分区名称会有所不同,因为这些名称在所有项目中必须是唯一的。

4. 启动 Cloud Shell

激活 Cloud Shell

  1. 在 Cloud Console 中,点击激活 Cloud ShellH7JlbhKGHITmsxhQIcLwoe5HXZMhDlYue4K-SPszMxUxDjIeWfOHBfxDHYpmLQTzUmQ7Xx8o6OJUlANnQF0iBuUyfp1RzVad_4nCa0Zz5LtwBlUZFXFCWFrmrWZLqg1MkZz2LdgUDQ

zlNW0HehB_AFW1qZ4AyebSQUdWm95n7TbnOr7UVm3j9dFcg6oWApJRlC0jnU1Mvb-IQp-trP1Px8xKNwt6o3pP6fyih947sEhOFI4IRF0W7WZk6hFqZDUGXQQXrw21GuMm2ecHrbzQ

如果您以前从未启动过 Cloud Shell,将看到一个中间屏幕(在折叠下面),描述它是什么。如果是这种情况,请点击继续(您将永远不会再看到它)。一次性屏幕如下所示:

kEPbNAo_w5C_pi9QvhFwWwky1cX8hr_xEMGWySNIoMCdi-Djx9AQRqWn-__DmEpC7vKgUtl-feTcv-wBxJ8NwzzAp7mY65-fi2LJo4twUoewT1SUjd6Y3h81RG3rKIkqhoVlFR-G7w

预配和连接到 Cloud Shell 只需花几分钟时间。

pTv5mEKzWMWp5VBrg2eGcuRPv9dLInPToS-mohlrqDASyYGWnZ_SwE-MzOWHe76ZdCSmw0kgWogSJv27lrQE8pvA5OD6P1I47nz8vrAdK7yR1NseZKJvcxAZrPb8wRxoqyTpD-gbhA

这个虚拟机已加载了您需要的所有开发工具。它提供了一个持久的 5GB 主目录,并且在 Google Cloud 中运行,大大增强了网络性能和身份验证。只需使用一个浏览器或 Google Chromebook 即可完成本 Codelab 中的大部分(甚至全部)工作。

在连接到 Cloud Shell 后,您应该会看到自己已通过身份验证,并且相关项目已设置为您的项目 ID:

  1. 在 Cloud Shell 中运行以下命令以确认您已通过身份验证:
gcloud auth list

命令输出

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
gcloud config list project

命令输出

[core]
project = <PROJECT_ID>

如果不是上述结果,您可以使用以下命令进行设置:

gcloud config set project <PROJECT_ID>

命令输出

Updated property [core/project].

5. 创建 Maven 项目

在 Cloud Shell 启动后,我们首先使用适用于 Apache Beam 的 Java SDK 创建一个 Maven 项目。

Apache Beam 是一种用于数据流水线的开源编程模型。您可以使用 Apache Beam 程序定义这些流水线,并且可以选择运行程序(如 Dataflow)来执行流水线。

按如下方式在 shell 中运行 mvn archetype:generate 命令:

  mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.46.0 \
     -DgroupId=org.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

运行该命令后,您应该会在当前目录下看到一个名为 first-dataflow 的新目录。first-dataflow 包含一个 Maven 项目,该项目包含 Java 版 Cloud Dataflow SDK 和示例流水线。

6. 在 Cloud Dataflow 上运行文本处理流水线

首先,将项目 ID 和 Cloud Storage 存储分区名称保存为环境变量。您可以在 Cloud Shell 中执行此操作。请务必将 <your_project_id> 替换为您自己的项目 ID。

 export PROJECT_ID=<your_project_id>

现在,我们将对 Cloud Storage 存储分区执行相同的操作。请记住,将 <your_bucket_name> 替换为您在上一步中创建存储分区时使用的唯一名称。

 export BUCKET_NAME=<your_bucket_name>

切换到 first-dataflow/ 目录:

 cd first-dataflow

我们将运行一个名为 WordCount 的管道,它会读取文本,将文本行标记化为单个字词,并对每个字词进行词频计数。首先,我们将运行流水线,并在其运行时查看每个步骤发生的情况。

通过在 shell 或终端窗口中运行 mvn compile exec:java 命令来启动流水线。对于 --project, --stagingLocation,--output 参数,以下命令会引用您在此步骤前面设置的环境变量。

 mvn compile exec:java \
      -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=DataflowRunner \
      --region=us-central1 \
      --gcpTempLocation=gs://${BUCKET_NAME}/temp"

在作业运行时,我们在作业列表中查找该作业。

Google Cloud Platform Console 中打开 Cloud Dataflow 网页界面。您应该会看到处于正在运行状态的字数统计作业:

3623be74922e3209

现在,我们来看看流水线参数。首先,点击作业名称:

816d8f59c72797d7

选择作业后,您可以查看执行图。流水线的执行图将流水线中的每个转换表示为一个方框,其中包含转换名称和一些状态信息。您可以点击每个步骤右上角的“Carat”符号查看更多详情:

80a972dd19a6f1eb

我们来看一下流水线如何在每个步骤中转换数据:

  • 读取:在此步骤中,流水线从输入源中读取数据。在本示例中,它是来自 Cloud Storage 的文本文件,其中包含莎士比亚戏剧王李的全文。我们的流水线会逐行读取文件,并输出每个 PCollection,其中文本文件中的每一行都是集合中的一个元素。
  • CountWordsCountWords 步骤包含两个部分。首先,它使用名为 ExtractWords 的并行 do 函数 (ParDo) 将每行标记化为单独的单词。ExtractWords 的输出是一个新的 PCollection,其中每个元素都是一个单词。下一步是 Count,利用 Java SDK 提供的转换,该转换会返回键值对,其中键是唯一单词,值是出现次数。下面是实现 CountWords 的方法,您可以在 GitHub 上查看完整的 WordCount.java 文件:
 /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }
  • MapElements:这会调用下面复制的 FormatAsTextFn,它会将每个键值对的格式设置为可打印的字符串。
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts:在此步骤中,我们将可打印的字符串写入多个分片文本文件。

我们会在几分钟后查看流水线生成的输出。

现在,查看图表右侧的作业信息页面,其中包含我们在 mvn compile exec:java 命令中包含的流水线参数。

9723815a1f5bf08b

208a7f0d6973acf6.png

您还可以查看流水线的自定义计数器,在本例中,该计数器会显示到目前为止执行期间遇到了多少空行。您可以将新的计数器添加到流水线中,以跟踪特定于应用的指标。

a2e2800e2c6893f8.png

您可以点击控制台底部的日志图标,查看具体的错误消息。

23c64138a1027f8

该面板默认显示作业日志消息,用于报告作业的整体状态。您可以使用最低严重级别选择器来过滤作业进度和状态消息。

94ba42015fdafbe2

选择图表中的流水线步骤可将视图更改为由您的代码生成的日志,以及在流水线步骤中运行的生成的代码。

要返回“作业日志”,请点击图表外部区域或使用右侧面板中的“关闭”按钮来取消选中步骤。

您可以使用“日志”标签页中的工作器日志按钮来查看运行流水线的 Compute Engine 实例的工作器日志。工作器日志由您的代码生成的日志行以及 Dataflow 生成的运行该代码的代码组成。

如果您要尝试调试流水线中的故障,工作器日志中通常会显示有助于解决问题的其他日志记录。请注意,这些日志是从所有工作器汇总,可以过滤和搜索。

5a53c244f28d5478

Dataflow 监控界面仅显示最新的日志消息。您可以点击日志窗格右侧的 Google Cloud Observability 链接,以查看所有日志。

2bc704a4d6529b31

下面汇总了可从“监控”→“日志”页面查看的不同日志类型:

  • job-message 日志包含 Dataflow 的各个组件生成的作业级消息。例如自动扩缩配置、工作器启动或关停时间、作业步骤进度以及作业错误。源自崩溃用户代码以及 worker 日志中出现的工作器级错误也会传播到 job-message 日志。
  • worker 日志由 Dataflow 工作器生成。工作器负责完成大部分流水线工作(例如,将 ParDo 应用于数据)。Worker 日志包含由您的代码和 Dataflow 记录的消息。
  • worker-startup 日志存在于大多数 Dataflow 作业中,可捕获与启动过程相关的消息。启动过程包括从 Cloud Storage 下载作业的 JAR 文件,然后启动工作器。如果启动 worker 时出现问题,请查看这些日志。
  • shuffler 日志中包含的消息来自负责整合并行流水线操作结果的工作器。
  • dockerkubelet 日志包含与在 Dataflow 工作器上使用的公开技术相关的消息。

在下一步中,我们将检查您的作业是否成功。

7. 检查您的作业是否成功运行

Google Cloud Platform Console 中打开 Cloud Dataflow 网页界面。

您应该会看到字数统计作业最初的状态为正在运行,然后变为成功

4c408162416d03a2

该作业大约需要 3-4 分钟来运行。

还记得您运行流水线并指定输出存储分区的时候吗?我们来看一下结果(难道你不想看《King Lear》中每个字词出现多少次?!)。返回 Google Cloud Platform Console 中的 Cloud Storage 浏览器。在您的存储分区中,您应该会看到作业所创建的输出文件和暂存文件:

25a5d3d4b5d0b567

8. 关停资源

您可以从 Google Cloud Platform Console 关停您的资源。

在 Google Cloud Platform Console 中打开 Cloud Storage 浏览器。

2b6c3a2a92b47015

选中您创建的存储分区旁边的复选框,然后点击删除以永久删除存储分区及其内容。

2f7780bdf10b69ba

8051ef293a8e5cfe

9. 恭喜!

您学习了如何使用 Cloud Dataflow SDK 创建 Maven 项目、使用 Google Cloud Platform Console 运行示例流水线,以及删除关联的 Cloud Storage 存储分区及其内容。

了解详情

许可

此作品已获得知识共享署名 3.0 通用许可和 Apache 2.0 许可。