使用 Dataproc 和 Cloud Pub/Sub 构建更改数据捕获

使用 Dataproc 和 Cloud Pub/Sub 构建更改数据捕获

关于此 Codelab

subject上次更新时间:6月 19, 2025
account_circleJatin Narula 编写

1. 简介

df8070bd84336207.png

上次更新时间:2025 年 6 月 19 日

什么是变更数据捕获?

变更数据捕获 (CDC) 是一组软件设计模式,用于确定和跟踪数据库中已更改的数据。简单来说,这是一种捕获和记录对数据所做的更改的方法,以便将这些更改复制到其他系统。

变更数据捕获 (CDC) 在各种数据驱动型场景(例如数据迁移、实时数据仓库和分析、灾难恢复和高可用性、审核和合规性等)中非常有用。

数据迁移

CDC 支持增量数据传输,可缩短停机时间并最大限度地减少中断,从而简化数据迁移项目。

实时数据仓库和分析

CDC 可确保数据仓库和分析系统不断更新,以反映操作数据库中的最新更改。

这样,企业就可以根据实时信息做出决策。

灾难恢复和高可用性

CDC 支持将数据实时复制到辅助数据库,以实现灾难恢复。在发生故障时,CDC 可快速故障切换到次要数据库,从而尽可能缩短停机时间并减少数据丢失。

审核和合规性

CDC 会提供数据更改的详细审核轨迹,这对于满足合规性和监管要求至关重要。

构建内容

在此 Codelab 中,您将使用 Cloud Pub/Sub、Dataproc、Python 和 Apache Spark 构建变更数据捕获 (CDC) 数据流水线。您的流水线将:

  • 模拟数据库更改,并将其作为事件发布到可扩缩且可靠的消息传递服务 Cloud Pub/Sub。
  • 利用 Google Cloud 的代管式 Spark 和 Hadoop 服务 Dataproc 的强大功能,实时处理这些事件。

通过连接这些服务,您可以创建一个强大的渠道,能够在数据发生变化时捕获和处理这些变化,为实时分析、数据仓库和其他关键应用奠定基础。

学习内容

此 Codelab 重点介绍 Dataproc 和 Cloud Pub/Sub。对于不相关的概念,我们仅会略作介绍,但是会提供相应代码块供您复制和粘贴。

所需条件

  • 已设置项目的有效 GCP 账号。如果您没有 Google Workspace 账号,可以注册免费试用。
  • 已安装并配置 gcloud CLI
  • 安装了 Python 3.7 或更高版本,用于模拟数据库更改并与 Pub/Sub 交互。
  • 对 Dataproc、Cloud Pub/Sub、Apache Spark 和 Python 有基本的了解。

开始前须知

在终端中执行以下命令以启用所需的 API:

gcloud services enable \
    dataproc
.googleapis.com \
    pubsub
.googleapis.com \

2. 设置 Cloud Pub/Sub

创建主题

此主题将用于发布数据库更改。Dataproc 作业将成为这些消息的使用方,并会处理这些消息以进行更改数据捕获。如需详细了解这些主题,请点击此处参阅官方文档。

gcloud pubsub topics create database-changes

创建订阅

创建一个订阅,用于使用 Pub/Sub 中的消息。如需详细了解订阅,请点击此处阅读官方文档。

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. 模拟数据库更改

步骤

  1. 创建一个 Python 脚本(例如simulate_cdc.py)来模拟数据库更改并将其发布到 Pub/Sub。
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
   
data_str = json.dumps(data).encode("utf-8")
   
future = publisher.publish(topic_path, data=data_str)
   
print(f"Published message ID: {future.result()}")

def simulate_change():
   
change_types = ["INSERT", "UPDATE", "DELETE"]
   
change_type = random.choice(change_types)
   
record_id = random.randint(1, 100)
   
timestamp = time.time()
   
change_data = {
       
"change_type": change_type,
       
"record_id": record_id,
       
"timestamp": timestamp,
       
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
   
}
   
publish_message(change_data)

if __name__ == "__main__":
   
while True:
       
simulate_change()
       
time.sleep(2)  # Simulate changes every 2 seconds

将 YOUR_PROJECT_ID 替换为您的实际 GCP 项目 ID

  1. 安装 Pub/Sub 客户端库:
pip install google-cloud-pubsub
  1. 在终端上运行该脚本。此脚本将持续运行,并每 2 秒向 Pub/Sub 主题发布一条消息。
python simulate_cdc.py
  1. 运行脚本大约 1 分钟后,Pub/Sub 中就会有足够的消息可供使用。您可以按 Ctrl + C 或 Cmd + C 终止正在运行的 Python 脚本,具体取决于您的操作系统。
  2. 查看已发布的消息:

打开另一个终端并运行以下命令,查看已发布的消息:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

您应该会看到一个包含消息和其他字段的表格行:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

说明

  • Python 脚本通过随机生成 INSERTUPDATEDELETE 事件来模拟数据库更改。
  • 每个更改都表示为一个 JSON 对象,其中包含更改类型、记录 ID、时间戳和数据。
  • 该脚本使用 Cloud Pub/Sub 客户端库将这些更改事件发布到 database-changes 主题。
  • 借助订阅方命令,您可以查看发送到 pub/sub 主题的消息。

4. 为 Dataproc 创建服务账号

在本部分中,您将创建一个可供 Dataproc 集群使用的服务账号。您还可以为该服务账号分配必要的权限,以允许集群实例访问 Cloud Pub/Sub 和 Dataproc。

  1. 创建服务账号:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. 添加 Dataproc 工作器角色,以允许服务账号创建集群并运行作业。在以下命令中将上一个命令生成的服务账号 ID 添加为成员:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. 添加 Pub/Sub 订阅者角色,以允许服务账号订阅“change-subscriber”Pub/Sub 订阅:
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change
-subscriber \
       
--role roles/pubsub.subscriber \
       
--member="serviceAccount:<your-service-account-with-domain"

5. 创建 Dataproc 集群

Dataproc 集群将运行 Spark 应用,该应用将处理 Pub/Sub 中的消息。您需要在上一部分中创建的服务账号。Dataproc 将此服务账号分配给集群中的每个实例,以便所有实例都获得正确的应用运行权限。

使用以下命令创建 Dataproc 集群:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. 将 Spark 作业提交到 Dataproc 集群

Spark 流式应用会处理 Pub/Sub 中的数据库更改消息,并将其输出到控制台。

步骤

  1. 创建一个目录,并将使用方的源代码添加到 PubsubConsumer.scala 文件中
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc
-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

 
def createContext(projectID: String, checkpointDirectory: String)
   
: StreamingContext = {

   
// [START stream_setup]
   
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
   
val ssc = new StreamingContext(sparkConf, Seconds(1))

   
// Set the checkpoint directory
   
val yarnTags = sparkConf.get("spark.yarn.tags")
   
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
   
ssc.checkpoint(checkpointDirectory + '/' + jobId)
   
   
// Create stream
   
val messagesStream: DStream[String] = PubsubUtils
     
.createStream(
       
ssc,
       
projectID,
       
None,
       
"change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
       
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
     
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
   
// [END stream_setup]

   
processStringDStream(messagesStream)
   
       
ssc
 
}

 
def processStringDStream(stringDStream: DStream[String]): Unit = {
   
stringDStream.foreachRDD { rdd =>
     
if (!rdd.isEmpty()) {
       
val listOfStrings: List[String] = rdd.collect().toList
       
listOfStrings.foreach(str => println(s"message received: $str"))
     
} else {
       
println("looking for message...")
     
}
   
}
 
}

 
def main(args: Array[String]): Unit = {
   
if (args.length != 2) {
     
System.err.println("arguments are not passed correctly!")
     
System.exit(1)
   
}

   
val Seq(projectID, checkpointDirectory) = args.toSeq

   
// Create Spark context
   
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
     
() => createContext(projectID, checkpointDirectory))

   
// Start streaming until we receive an explicit termination
   
ssc.start()
   
ssc.awaitTermination()
 
}

}
  1. 创建以下内容并将其添加到 pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. 切换到项目的 spark 目录,并将路径保存到环境变量中以供稍后使用:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. 更改目录:
cd $REPO_ROOT/spark
  1. 下载 Java 1.8 并将该文件夹放置在 /usr/lib/jvm/ 中。然后,将 JAVA_HOME 更改为指向以下位置:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. 构建应用 jar
mvn clean package

spark/target 目录中创建包含应用代码和依赖项的 spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar 归档

  1. 提交 Spark 应用:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. 显示活跃的作业列表并记下作业的 JOB_ID 值:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

输出将如下所示

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. 在浏览器中打开以下网址查看作业输出。将 [JOB_ID] 替换为上一步中记录的值。
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. 输出类似于以下内容:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

在 Dataproc 中运行的 Spark 流式作业会从 Pub/Sub 拉取消息、对其进行处理,并将输出显示在控制台中。

  1. 终止作业:运行以下命令以终止作业。将 JOB_ID 替换为我们之前记下的 ID
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

恭喜!您刚刚创建了一个强大的 CDC 流水线,该流水线会捕获 Pub/Sub 中的数据库更改,并使用在 Cloud Dataproc 中运行的 Spark 流式处理这些更改。

7. 清理

清理您创建的所有资源,避免日后再为这些资源支付费用。若要避免产生费用,最简单的方法是删除您为本教程创建的项目。或者,您可以删除单个资源。

运行以下命令以删除单个资源

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics
delete database-changes --quiet
gcloud pubsub subscriptions
delete change-subscriber --quiet
gcloud iam service
-accounts delete <your-service-account-with-domain> --quiet

8. 恭喜

恭喜,您刚刚完成了一个实操 Codelab,该 Codelab 演示了如何使用 Google Cloud Platform 构建强大的实时数据流水线。我们来回顾一下您已完成的内容:

  • 模拟变更数据捕获 (CDC):您学习了 CDC 的基础知识,并实现了一个 Python 脚本来模拟数据库更改,生成表示实时数据修改的事件。
  • 利用 Cloud Pub/Sub:您设置 Cloud Pub/Sub 主题和订阅,从而提供可扩缩且可靠的消息传递服务,用于流式传输 CDC 事件。您已将模拟的数据库更改发布到 Pub/Sub,从而创建了实时数据流。
  • 使用 Dataproc 和 Spark 处理的数据:您已预配 Dataproc 集群并部署了 Spark Streaming 作业,以使用 Pub/Sub 订阅中的消息。您已实时处理和转换传入的 CDC 事件,并在 Dataproc 作业日志中显示结果。
  • 构建了端到端实时数据流水线:您成功集成了这些服务,创建了一个完整的数据流水线,可实时捕获、流式传输和处理数据更改。您获得了构建可处理连续数据流的系统的实用经验。
  • 使用了 Spark Pub/Sub 连接器:您已成功配置 Dataproc 集群以使用 Spark Pub/Sub 连接器,这对于 Spark Structured Streaming 从 Pub/Sub 读取数据至关重要。

现在,您已经打下了坚实的基础,可以为各种应用(包括实时分析、数据仓库和微服务架构)构建更复杂、更精细的数据流水线。继续探索和构建!

参考文档