关于此 Codelab
1. 简介
上次更新时间:2025 年 6 月 19 日
什么是变更数据捕获?
变更数据捕获 (CDC) 是一组软件设计模式,用于确定和跟踪数据库中已更改的数据。简单来说,这是一种捕获和记录对数据所做的更改的方法,以便将这些更改复制到其他系统。
变更数据捕获 (CDC) 在各种数据驱动型场景(例如数据迁移、实时数据仓库和分析、灾难恢复和高可用性、审核和合规性等)中非常有用。
数据迁移
CDC 支持增量数据传输,可缩短停机时间并最大限度地减少中断,从而简化数据迁移项目。
实时数据仓库和分析
CDC 可确保数据仓库和分析系统不断更新,以反映操作数据库中的最新更改。
这样,企业就可以根据实时信息做出决策。
灾难恢复和高可用性
CDC 支持将数据实时复制到辅助数据库,以实现灾难恢复。在发生故障时,CDC 可快速故障切换到次要数据库,从而尽可能缩短停机时间并减少数据丢失。
审核和合规性
CDC 会提供数据更改的详细审核轨迹,这对于满足合规性和监管要求至关重要。
构建内容
在此 Codelab 中,您将使用 Cloud Pub/Sub、Dataproc、Python 和 Apache Spark 构建变更数据捕获 (CDC) 数据流水线。您的流水线将:
- 模拟数据库更改,并将其作为事件发布到可扩缩且可靠的消息传递服务 Cloud Pub/Sub。
- 利用 Google Cloud 的代管式 Spark 和 Hadoop 服务 Dataproc 的强大功能,实时处理这些事件。
通过连接这些服务,您可以创建一个强大的渠道,能够在数据发生变化时捕获和处理这些变化,为实时分析、数据仓库和其他关键应用奠定基础。
学习内容
- 如何创建基本更改数据捕获流水线
- 用于流处理的 Dataproc
- 用于实时消息传递的 Cloud Pub/Sub
- Apache Spark 基础知识
此 Codelab 重点介绍 Dataproc 和 Cloud Pub/Sub。对于不相关的概念,我们仅会略作介绍,但是会提供相应代码块供您复制和粘贴。
所需条件
- 已设置项目的有效 GCP 账号。如果您没有 Google Workspace 账号,可以注册免费试用。
- 已安装并配置 gcloud CLI。
- 安装了 Python 3.7 或更高版本,用于模拟数据库更改并与 Pub/Sub 交互。
- 对 Dataproc、Cloud Pub/Sub、Apache Spark 和 Python 有基本的了解。
开始前须知
在终端中执行以下命令以启用所需的 API:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
2. 设置 Cloud Pub/Sub
创建主题
此主题将用于发布数据库更改。Dataproc 作业将成为这些消息的使用方,并会处理这些消息以进行更改数据捕获。如需详细了解这些主题,请点击此处参阅官方文档。
gcloud pubsub topics create database-changes
创建订阅
创建一个订阅,用于使用 Pub/Sub 中的消息。如需详细了解订阅,请点击此处阅读官方文档。
gcloud pubsub subscriptions create --topic database-changes change-subscriber
3. 模拟数据库更改
步骤
- 创建一个 Python 脚本(例如
simulate_cdc.py
)来模拟数据库更改并将其发布到 Pub/Sub。
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
将 YOUR_PROJECT_ID 替换为您的实际 GCP 项目 ID
- 安装 Pub/Sub 客户端库:
pip install google-cloud-pubsub
- 在终端上运行该脚本。此脚本将持续运行,并每 2 秒向 Pub/Sub 主题发布一条消息。
python simulate_cdc.py
- 运行脚本大约 1 分钟后,Pub/Sub 中就会有足够的消息可供使用。您可以按 Ctrl + C 或 Cmd + C 终止正在运行的 Python 脚本,具体取决于您的操作系统。
- 查看已发布的消息:
打开另一个终端并运行以下命令,查看已发布的消息:
gcloud pubsub subscriptions pull --auto-ack change-subscriber
您应该会看到一个包含消息和其他字段的表格行:
{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}
说明
- Python 脚本通过随机生成
INSERT
、UPDATE
或DELETE
事件来模拟数据库更改。 - 每个更改都表示为一个 JSON 对象,其中包含更改类型、记录 ID、时间戳和数据。
- 该脚本使用 Cloud Pub/Sub 客户端库将这些更改事件发布到
database-changes
主题。 - 借助订阅方命令,您可以查看发送到 pub/sub 主题的消息。
4. 为 Dataproc 创建服务账号
在本部分中,您将创建一个可供 Dataproc 集群使用的服务账号。您还可以为该服务账号分配必要的权限,以允许集群实例访问 Cloud Pub/Sub 和 Dataproc。
- 创建服务账号:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- 添加 Dataproc 工作器角色,以允许服务账号创建集群并运行作业。在以下命令中将上一个命令生成的服务账号 ID 添加为成员:
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- 添加 Pub/Sub 订阅者角色,以允许服务账号订阅“change-subscriber”Pub/Sub 订阅:
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
5. 创建 Dataproc 集群
Dataproc 集群将运行 Spark 应用,该应用将处理 Pub/Sub 中的消息。您需要在上一部分中创建的服务账号。Dataproc 将此服务账号分配给集群中的每个实例,以便所有实例都获得正确的应用运行权限。
使用以下命令创建 Dataproc 集群:
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
6. 将 Spark 作业提交到 Dataproc 集群
Spark 流式应用会处理 Pub/Sub 中的数据库更改消息,并将其输出到控制台。
步骤
- 创建一个目录,并将使用方的源代码添加到 PubsubConsumer.scala 文件中
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- 创建以下内容并将其添加到 pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- 切换到项目的 spark 目录,并将路径保存到环境变量中以供稍后使用:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- 更改目录:
cd $REPO_ROOT/spark
- 下载 Java 1.8 并将该文件夹放置在 /usr/lib/jvm/ 中。然后,将 JAVA_HOME 更改为指向以下位置:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- 构建应用 jar
mvn clean package
在 spark/target
目录中创建包含应用代码和依赖项的 spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar 归档
- 提交 Spark 应用:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- 显示活跃的作业列表并记下作业的
JOB_ID
值:
gcloud dataproc jobs list --region=us-central1 --state-filter=active
输出将如下所示
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- 在浏览器中打开以下网址查看作业输出。将 [JOB_ID] 替换为上一步中记录的值。
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- 输出类似于以下内容:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
在 Dataproc 中运行的 Spark 流式作业会从 Pub/Sub 拉取消息、对其进行处理,并将输出显示在控制台中。
- 终止作业:运行以下命令以终止作业。将 JOB_ID 替换为我们之前记下的 ID
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
恭喜!您刚刚创建了一个强大的 CDC 流水线,该流水线会捕获 Pub/Sub 中的数据库更改,并使用在 Cloud Dataproc 中运行的 Spark 流式处理这些更改。
7. 清理
清理您创建的所有资源,避免日后再为这些资源支付费用。若要避免产生费用,最简单的方法是删除您为本教程创建的项目。或者,您可以删除单个资源。
运行以下命令以删除单个资源
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
8. 恭喜
恭喜,您刚刚完成了一个实操 Codelab,该 Codelab 演示了如何使用 Google Cloud Platform 构建强大的实时数据流水线。我们来回顾一下您已完成的内容:
- 模拟变更数据捕获 (CDC):您学习了 CDC 的基础知识,并实现了一个 Python 脚本来模拟数据库更改,生成表示实时数据修改的事件。
- 利用 Cloud Pub/Sub:您设置 Cloud Pub/Sub 主题和订阅,从而提供可扩缩且可靠的消息传递服务,用于流式传输 CDC 事件。您已将模拟的数据库更改发布到 Pub/Sub,从而创建了实时数据流。
- 使用 Dataproc 和 Spark 处理的数据:您已预配 Dataproc 集群并部署了 Spark Streaming 作业,以使用 Pub/Sub 订阅中的消息。您已实时处理和转换传入的 CDC 事件,并在 Dataproc 作业日志中显示结果。
- 构建了端到端实时数据流水线:您成功集成了这些服务,创建了一个完整的数据流水线,可实时捕获、流式传输和处理数据更改。您获得了构建可处理连续数据流的系统的实用经验。
- 使用了 Spark Pub/Sub 连接器:您已成功配置 Dataproc 集群以使用 Spark Pub/Sub 连接器,这对于 Spark Structured Streaming 从 Pub/Sub 读取数据至关重要。
现在,您已经打下了坚实的基础,可以为各种应用(包括实时分析、数据仓库和微服务架构)构建更复杂、更精细的数据流水线。继续探索和构建!