使用 Dataproc 和 Cloud Pub/Sub 建構變更資料擷取系統

使用 Dataproc 和 Cloud Pub/Sub 建構變更資料擷取系統

程式碼研究室簡介

subject上次更新時間:6月 19, 2025
account_circle作者:Jatin Narula

1. 簡介

df8070bd84336207.png

上次更新時間:2025 年 6 月 19 日

什麼是變更資料擷取?

變更資料擷取 (CDC) 是一組軟體設計模式,用於判斷及追蹤資料庫中已變更的資料。簡單來說,這項功能可擷取及記錄資料的變更,以便將這些變更複製到其他系統。

變更資料擷取 (CDC) 在許多資料驅動型情境中都非常實用,例如資料遷移、即時資料倉儲和分析、災難復原和高可用性、稽核和法規遵循等。

資料遷移

CDC 可讓您逐步傳輸資料,減少停機時間並盡量減少中斷情形,簡化資料遷移專案。

即時資料倉儲和分析

變更資料擷取可確保資料倉儲和分析系統持續更新營運資料庫的最新變更。

讓企業能根據即時資訊做出決策。

災難復原和高可用性

變更資料擷取可讓您將資料即時複製到次要資料庫,以利災難復原。發生故障時,CDC 可快速將資料容錯移轉至次要資料庫,盡量減少停機時間和資料遺失。

稽核和法規遵循

CDC 會提供詳細的資料變更稽核追蹤,這對於遵循法規和法規要求至關重要。

建構項目

在本程式碼研究室中,您將使用 Cloud Pub/Sub、Dataproc、Python 和 Apache Spark 建構變更資料擷取 (CDC) 資料管道。管道會執行以下操作:

  • 模擬資料庫變更,並將變更事件發布至可擴充且可靠的訊息傳遞服務 Cloud Pub/Sub。
  • 善用 Google Cloud 代管的 Spark 和 Hadoop 服務 Dataproc 的強大功能,即時處理這些事件。

連結這些服務後,您就能建立可靠的管道,在資料變更發生時即時擷取及處理資料,為即時分析、資料倉儲和其他重要應用程式提供基礎。

課程內容

本程式碼研究室著重於 Dataproc 和 Cloud Pub/Sub。我們不會對與本主題無關的概念和程式碼多做介紹,但會事先準備好這些程式碼區塊,屆時您只要複製及貼上即可。

軟硬體需求

  • 有效的 GCP 帳戶,並已設定專案。如果你還沒有 Google 帳戶,可以註冊免費試用。
  • 已安裝並設定 gcloud CLI
  • 安裝 Python 3.7 以上版本,用於模擬資料庫變更並與 Pub/Sub 互動。
  • 具備 Dataproc、Cloud Pub/Sub、Apache Spark 和 Python 的基本知識。

事前準備

在終端機中執行下列指令,啟用必要的 API:

gcloud services enable \
    dataproc
.googleapis.com \
    pubsub
.googleapis.com \

2. 設定 Cloud Pub/Sub

建立主題

這個主題會用來發布資料庫變更。Dataproc 工作會成為這些訊息的使用者,並處理訊息以擷取變更資料。如要進一步瞭解這些主題,請參閱官方說明文件

gcloud pubsub topics create database-changes

建立訂閱項目

建立用於在 Pub/Sub 中取用訊息的訂閱項目。如要進一步瞭解訂閱項目,請參閱官方說明文件

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. 模擬資料庫變更

步驟

  1. 建立 Python 指令碼 (例如simulate_cdc.py) 模擬資料庫變更,並將變更發布至 Pub/Sub。
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
   
data_str = json.dumps(data).encode("utf-8")
   
future = publisher.publish(topic_path, data=data_str)
   
print(f"Published message ID: {future.result()}")

def simulate_change():
   
change_types = ["INSERT", "UPDATE", "DELETE"]
   
change_type = random.choice(change_types)
   
record_id = random.randint(1, 100)
   
timestamp = time.time()
   
change_data = {
       
"change_type": change_type,
       
"record_id": record_id,
       
"timestamp": timestamp,
       
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
   
}
   
publish_message(change_data)

if __name__ == "__main__":
   
while True:
       
simulate_change()
       
time.sleep(2)  # Simulate changes every 2 seconds

將 YOUR_PROJECT_ID 替換為實際的 GCP 專案 ID

  1. 安裝 Pub/Sub 用戶端程式庫:
pip install google-cloud-pubsub
  1. 在終端機中執行指令碼。這個指令碼會持續執行,並每 2 秒發布一次訊息至 Pub/Sub 主題。
python simulate_cdc.py
  1. 執行指令碼 1 分鐘後,Pub/Sub 中就會有足夠的訊息可供使用。您可以按下 Ctrl + C 或 Cmd + C 來終止執行中的 Python 指令碼 (視作業系統而定)。
  2. 查看已發布的訊息:

開啟另一個終端機並執行下列指令,即可查看已發布的訊息:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

您應該會看到包含訊息和其他欄位的資料表列:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

說明

  • Python 指令碼會隨機產生 INSERTUPDATEDELETE 事件,模擬資料庫變更。
  • 每個變更都會以 JSON 物件的形式呈現,其中包含變更類型、記錄 ID、時間戳記和資料。
  • 這個指令碼會使用 Cloud Pub/Sub 用戶端程式庫,將這些變更事件發布至 database-changes 主題。
  • 您可以使用 subscriber 指令查看傳送至 Pub/Sub 主題的訊息。

4. 為 Dataproc 建立服務帳戶

在本節中,您將瞭解如何建立 Dataproc 叢集可使用的服務帳戶。您也可以指派必要的權限,允許叢集執行個體存取 Cloud Pub/Sub 和 Dataproc。

  1. 建立服務帳戶:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. 新增 Dataproc 工作站角色,允許服務帳戶建立叢集並執行工作。在下列指令中,將先前指令產生的服務帳戶 ID 新增為成員:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. 新增 Pub/sub 訂閱者角色,允許服務帳戶訂閱「change-subscriber」Pub/sub 訂閱:
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change
-subscriber \
       
--role roles/pubsub.subscriber \
       
--member="serviceAccount:<your-service-account-with-domain"

5. 建立 Dataproc 叢集

Dataproc 叢集會執行 Spark 應用程式,處理 Pub/Sub 中的訊息。您需要在上一節建立的服務帳戶。Dataproc 會將這個服務帳戶指派給叢集中的每個執行個體,讓所有執行個體都取得執行應用程式的正確權限。

使用下列指令建立 Dataproc 叢集:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. 將 Spark 工作提交至 Dataproc 叢集

Spark 串流應用程式會在 Pub/Sub 中處理資料庫變更訊息,並將訊息輸出至主控台。

步驟

  1. 建立目錄,並將使用者的原始碼新增至 PubsubConsumer.scala 檔案
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc
-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

 
def createContext(projectID: String, checkpointDirectory: String)
   
: StreamingContext = {

   
// [START stream_setup]
   
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
   
val ssc = new StreamingContext(sparkConf, Seconds(1))

   
// Set the checkpoint directory
   
val yarnTags = sparkConf.get("spark.yarn.tags")
   
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
   
ssc.checkpoint(checkpointDirectory + '/' + jobId)
   
   
// Create stream
   
val messagesStream: DStream[String] = PubsubUtils
     
.createStream(
       
ssc,
       
projectID,
       
None,
       
"change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
       
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
     
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
   
// [END stream_setup]

   
processStringDStream(messagesStream)
   
       
ssc
 
}

 
def processStringDStream(stringDStream: DStream[String]): Unit = {
   
stringDStream.foreachRDD { rdd =>
     
if (!rdd.isEmpty()) {
       
val listOfStrings: List[String] = rdd.collect().toList
       
listOfStrings.foreach(str => println(s"message received: $str"))
     
} else {
       
println("looking for message...")
     
}
   
}
 
}

 
def main(args: Array[String]): Unit = {
   
if (args.length != 2) {
     
System.err.println("arguments are not passed correctly!")
     
System.exit(1)
   
}

   
val Seq(projectID, checkpointDirectory) = args.toSeq

   
// Create Spark context
   
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
     
() => createContext(projectID, checkpointDirectory))

   
// Start streaming until we receive an explicit termination
   
ssc.start()
   
ssc.awaitTermination()
 
}

}
  1. 建立並新增下列內容至 pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. 變更為專案的 Spark 目錄,並將路徑儲存至環境變數以供日後使用:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. 變更目錄:
cd $REPO_ROOT/spark
  1. 下載 Java 1.8,並將資料夾放在 /usr/lib/jvm/ 中。然後將 JAVA_HOME 指向以下位置:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. 建構應用程式 Jar 檔案
mvn clean package

系統會在 spark/target 目錄中建立含有應用程式程式碼與依附元件的 spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar 封存

  1. 提交 Spark 應用程式:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. 顯示執行中的工作清單,並記下工作的 JOB_ID 值:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

輸出內容會類似於

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. 在瀏覽器中開啟下列網址,以查看工作輸出。將 [JOB_ID] 換成您上一步記下的值。
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. 輸出結果會與下列內容相似:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

在 Dataproc 中執行的 Spark 串流工作會從 Pub/Sub 提取訊息、處理訊息,並將輸出內容顯示在控制台中。

  1. 終止工作:執行下列指令即可終止工作。將 JOB_ID 換成先前記下的值
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

恭喜!您剛剛建立了功能強大的 CDC 管道,可在 Pub/Sub 中擷取資料庫變更,並使用在 Cloud Dataproc 中執行的 Spark 串流進行處理。

7. 清除所用資源

清除您建立的任何資源,這樣您日後就不需為其付費。如要避免付費,最簡單的方法就是刪除您針對教學課程建立的專案。或者,您也可以刪除個別資源。

執行下列指令即可刪除個別資源

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics
delete database-changes --quiet
gcloud pubsub subscriptions
delete change-subscriber --quiet
gcloud iam service
-accounts delete <your-service-account-with-domain> --quiet

8. 恭喜

恭喜!您剛完成實作程式碼研究室,瞭解如何使用 Google Cloud Platform 建構功能強大的即時資料管道。讓我們回顧您完成的內容:

  • 模擬變更資料擷取 (CDC):您已瞭解 CDC 的基本概念,並實作 Python 指令碼來模擬資料庫變更,產生代表即時資料修改的事件。
  • 善用 Cloud Pub/Sub:您可以設定 Cloud Pub/Sub 主題和訂閱項目,提供可擴充且可靠的訊息傳遞服務,用於串流傳輸 CDC 事件。您已將模擬資料庫變更發布至 Pub/Sub,建立即時資料串流。
  • 使用 Dataproc 和 Spark 處理資料:您已佈建 Dataproc 叢集,並部署 Spark 串流工作,以便使用 Pub/Sub 訂閱項目中的訊息。您即時處理及轉換傳入的 CDC 事件,並在 Dataproc 工作記錄中顯示結果。
  • 建立端對端即時管道:您已成功整合這些服務,建立完整的資料管道,可即時擷取、串流及處理資料變更。您已獲得建構可處理連續資料串流的系統實務經驗。
  • 使用 Spark Pub/Sub 連接器:您已成功設定 Dataproc 叢集,以便使用 Spark Pub/Sub 連接器,這對 Spark 結構化串流讀取 Pub/Sub 中的資料至關重要。

您現在已奠定穩固基礎,可為各種應用程式建構更複雜和精密的資料管道,包括即時分析、資料倉儲和微服務架構。請繼續探索及建構!

參考文件