Xây dựng tính năng Thu thập dữ liệu thay đổi bằng Dataproc và Cloud Pub/Sub

Xây dựng tính năng Thu thập dữ liệu thay đổi bằng Dataproc và Cloud Pub/Sub

Thông tin về lớp học lập trình này

subjectLần cập nhật gần đây nhất: thg 6 19, 2025
account_circleTác giả: Jatin Narula

1. Giới thiệu

df8070bd84336207.png

Lần cập nhật gần đây nhất: ngày 19 tháng 6 năm 2025

Công nghệ ghi nhận thay đổi dữ liệu là gì?

Tính năng Thu thập dữ liệu thay đổi (CDC) là một tập hợp các mẫu thiết kế phần mềm dùng để xác định và theo dõi dữ liệu đã thay đổi trong cơ sở dữ liệu. Nói một cách đơn giản, đây là cách ghi lại và ghi lại các thay đổi đối với dữ liệu để có thể sao chép các thay đổi đó cho các hệ thống khác.

Tính năng Thu thập dữ liệu thay đổi (CDC) cực kỳ hữu ích trong nhiều tình huống dựa trên dữ liệu như Di chuyển dữ liệu, Phân tích và kho dữ liệu theo thời gian thực, Khôi phục thảm họa và Khả năng hoạt động cao, Kiểm tra và tuân thủ, v.v.

Di chuyển dữ liệu

CDC đơn giản hoá các dự án di chuyển dữ liệu bằng cách cho phép truyền dữ liệu gia tăng, giảm thời gian ngừng hoạt động và giảm thiểu sự gián đoạn.

Lưu trữ dữ liệu và phân tích theo thời gian thực

CDC đảm bảo rằng kho dữ liệu và hệ thống phân tích liên tục được cập nhật những thay đổi mới nhất từ cơ sở dữ liệu hoạt động.

Nhờ đó, các doanh nghiệp có thể đưa ra quyết định dựa trên thông tin theo thời gian thực.

Phục hồi sau thảm hoạ và khả năng đáp ứng cao

CDC cho phép sao chép dữ liệu theo thời gian thực vào cơ sở dữ liệu phụ cho mục đích khôi phục sau thảm hoạ. Trong trường hợp xảy ra lỗi, CDC cho phép chuyển sang cơ sở dữ liệu phụ một cách nhanh chóng, giảm thiểu thời gian ngừng hoạt động và mất dữ liệu.

Kiểm tra và tuân thủ

CDC cung cấp một nhật ký kiểm tra chi tiết về các thay đổi đối với dữ liệu. Đây là yếu tố cần thiết để tuân thủ các yêu cầu theo quy định.

Sản phẩm bạn sẽ tạo ra

Trong lớp học lập trình này, bạn sẽ xây dựng một quy trình dữ liệu thu thập dữ liệu thay đổi (CDC) bằng cách sử dụng Cloud Pub/Sub, Dataproc, Python và Apache Spark. Quy trình của bạn sẽ:

  • Mô phỏng các thay đổi đối với cơ sở dữ liệu và phát hành các thay đổi đó dưới dạng sự kiện đến Cloud Pub/Sub, một dịch vụ nhắn tin đáng tin cậy và có thể mở rộng quy mô.
  • Tận dụng sức mạnh của Dataproc, dịch vụ Spark và Hadoop được quản lý của Google Cloud, để xử lý các sự kiện này theo thời gian thực.

Bằng cách kết nối các dịch vụ này, bạn sẽ tạo ra một quy trình mạnh mẽ có thể ghi lại và xử lý các thay đổi về dữ liệu khi chúng xảy ra, cung cấp nền tảng cho các hoạt động phân tích theo thời gian thực, lưu trữ dữ liệu và các ứng dụng quan trọng khác.

Kiến thức bạn sẽ học được

  • Cách tạo quy trình thu thập dữ liệu thay đổi cơ bản
  • Dataproc để xử lý luồng
  • Cloud Pub/Sub để nhắn tin theo thời gian thực
  • Kiến thức cơ bản về Apache Spark

Lớp học lập trình này tập trung vào Dataproc và Cloud Pub/Sub. Các khái niệm và khối mã không liên quan được tinh chỉnh và cung cấp cho bạn, chỉ cần sao chép và dán.

Bạn cần có

  • một tài khoản GCP đang hoạt động và đã thiết lập dự án. Nếu chưa có tài khoản, bạn có thể đăng ký dùng thử miễn phí.
  • Đã cài đặt và định cấu hình gcloud CLI.
  • Đã cài đặt Python 3.7 trở lên để mô phỏng các thay đổi đối với cơ sở dữ liệu và tương tác với Pub/Sub.
  • Kiến thức cơ bản về Dataproc, Cloud Pub/Sub, Apache Spark và Python.

Trước khi bạn bắt đầu

Thực thi lệnh sau trong dòng lệnh để bật các API bắt buộc:

gcloud services enable \
    dataproc
.googleapis.com \
    pubsub
.googleapis.com \

2. Thiết lập Cloud Pub/Sub

Tạo chủ đề

Chủ đề này sẽ được dùng để phát hành các thay đổi đối với cơ sở dữ liệu. Công việc Dataproc sẽ là người dùng của các thông báo này và sẽ xử lý các thông báo để thu thập dữ liệu thay đổi. Nếu muốn biết thêm về chủ đề, bạn có thể đọc tài liệu chính thức tại đây.

gcloud pubsub topics create database-changes

Tạo gói thuê bao

Tạo một gói thuê bao sẽ được dùng để sử dụng các thông báo trong Pub/Sub. Để biết thêm về gói thuê bao, bạn có thể đọc tài liệu chính thức tại đây.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. Mô phỏng các thay đổi đối với cơ sở dữ liệu

Các bước

  1. Tạo tập lệnh Python (ví dụ: simulate_cdc.py) để mô phỏng các thay đổi đối với cơ sở dữ liệu và phát hành các thay đổi đó lên Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
   
data_str = json.dumps(data).encode("utf-8")
   
future = publisher.publish(topic_path, data=data_str)
   
print(f"Published message ID: {future.result()}")

def simulate_change():
   
change_types = ["INSERT", "UPDATE", "DELETE"]
   
change_type = random.choice(change_types)
   
record_id = random.randint(1, 100)
   
timestamp = time.time()
   
change_data = {
       
"change_type": change_type,
       
"record_id": record_id,
       
"timestamp": timestamp,
       
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
   
}
   
publish_message(change_data)

if __name__ == "__main__":
   
while True:
       
simulate_change()
       
time.sleep(2)  # Simulate changes every 2 seconds

Thay thế YOUR_PROJECT_ID bằng mã dự án GCP thực tế của bạn

  1. Cài đặt thư viện ứng dụng Pub/Sub:
pip install google-cloud-pubsub
  1. Chạy tập lệnh trên thiết bị đầu cuối. Tập lệnh này sẽ chạy liên tục và phát hành thông báo mỗi 2 giây cho chủ đề Pub/Sub.
python simulate_cdc.py
  1. Sau khi chạy tập lệnh trong khoảng 1 phút, bạn sẽ có đủ thông báo trong Pub/Sub để sử dụng. Bạn có thể chấm dứt tập lệnh python đang chạy bằng cách nhấn tổ hợp phím ctrl + C hoặc Cmd + C, tuỳ thuộc vào hệ điều hành của bạn.
  2. Xem thông báo đã xuất bản:

Mở một cửa sổ dòng lệnh khác và chạy lệnh sau để xem các thông báo đã xuất bản:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

Bạn sẽ thấy một hàng trong bảng chứa thông báo và các trường khác:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

Giải thích

  • Tập lệnh Python mô phỏng các thay đổi đối với cơ sở dữ liệu bằng cách tạo ngẫu nhiên các sự kiện INSERT, UPDATE hoặc DELETE.
  • Mỗi thay đổi được biểu thị dưới dạng một đối tượng JSON chứa loại thay đổi, mã bản ghi, dấu thời gian và dữ liệu.
  • Tập lệnh này sử dụng thư viện ứng dụng Cloud Pub/Sub để phát hành các sự kiện thay đổi này đến chủ đề database-changes.
  • Lệnh subscriber cho phép bạn xem các thông báo đang được gửi đến chủ đề pub/sub.

4. Tạo tài khoản dịch vụ cho Dataproc

Trong phần này, bạn sẽ tạo một Tài khoản dịch vụ mà cụm Dataproc có thể sử dụng. Bạn cũng chỉ định các quyền cần thiết để cho phép các thực thể cụm truy cập vào Cloud Pub/Sub và Dataproc.

  1. Tạo tài khoản dịch vụ:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Thêm vai trò worker Dataproc để cho phép tài khoản dịch vụ tạo cụm và chạy công việc. Thêm mã tài khoản dịch vụ được tạo trong lệnh trước làm thành viên trong lệnh dưới đây:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Thêm vai trò người đăng ký Pub/sub để cho phép tài khoản dịch vụ đăng ký gói thuê bao Pub/sub "change-subscriber":
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change
-subscriber \
       
--role roles/pubsub.subscriber \
       
--member="serviceAccount:<your-service-account-with-domain"

5. Tạo cụm Dataproc

Cụm Dataproc sẽ chạy ứng dụng spark để xử lý các thông báo trong Pub/sub. Bạn sẽ cần tài khoản dịch vụ được tạo trong phần trước. Dataproc chỉ định tài khoản dịch vụ này cho mọi thực thể trong cụm để tất cả thực thể đều có quyền chính xác để chạy ứng dụng.

Sử dụng lệnh sau để tạo cụm Dataproc:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Gửi công việc Spark đến cụm Dataproc

Ứng dụng phát trực tuyến Spark xử lý các thông báo thay đổi cơ sở dữ liệu trong Pub/sub và in các thông báo đó vào bảng điều khiển.

Các bước

  1. Tạo một thư mục và thêm mã nguồn của trình tiêu thụ vào tệp PubsubConsumer.scala
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc
-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

 
def createContext(projectID: String, checkpointDirectory: String)
   
: StreamingContext = {

   
// [START stream_setup]
   
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
   
val ssc = new StreamingContext(sparkConf, Seconds(1))

   
// Set the checkpoint directory
   
val yarnTags = sparkConf.get("spark.yarn.tags")
   
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
   
ssc.checkpoint(checkpointDirectory + '/' + jobId)
   
   
// Create stream
   
val messagesStream: DStream[String] = PubsubUtils
     
.createStream(
       
ssc,
       
projectID,
       
None,
       
"change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
       
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
     
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
   
// [END stream_setup]

   
processStringDStream(messagesStream)
   
       
ssc
 
}

 
def processStringDStream(stringDStream: DStream[String]): Unit = {
   
stringDStream.foreachRDD { rdd =>
     
if (!rdd.isEmpty()) {
       
val listOfStrings: List[String] = rdd.collect().toList
       
listOfStrings.foreach(str => println(s"message received: $str"))
     
} else {
       
println("looking for message...")
     
}
   
}
 
}

 
def main(args: Array[String]): Unit = {
   
if (args.length != 2) {
     
System.err.println("arguments are not passed correctly!")
     
System.exit(1)
   
}

   
val Seq(projectID, checkpointDirectory) = args.toSeq

   
// Create Spark context
   
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
     
() => createContext(projectID, checkpointDirectory))

   
// Start streaming until we receive an explicit termination
   
ssc.start()
   
ssc.awaitTermination()
 
}

}
  1. Tạo và thêm nội dung sau vào pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. Thay đổi thành thư mục spark của dự án và lưu đường dẫn trong một biến môi trường để sử dụng sau:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. Thay đổi thư mục:
cd $REPO_ROOT/spark
  1. Tải Java 1.8 xuống và đặt thư mục này vào /usr/lib/jvm/. Sau đó, hãy thay đổi JAVA_HOME để trỏ đến đường dẫn sau:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. Tạo tệp jar của ứng dụng
mvn clean package

Tệp lưu trữ spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar chứa mã ứng dụng và các phần phụ thuộc được tạo trong thư mục spark/target

  1. Gửi ứng dụng Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. Hiển thị danh sách các công việc đang hoạt động và ghi lại giá trị JOB_ID cho công việc:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

Kết quả sẽ có dạng như sau

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. Xem kết quả công việc bằng cách mở URL sau trong trình duyệt. Thay thế [JOB_ID] bằng giá trị đã ghi chú ở bước trước.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. Kết quả sẽ tương tự như sau:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

Công việc truyền phát Spark chạy trong Dataproc sẽ lấy tin nhắn từ Pub/sub, xử lý các tin nhắn đó và hiển thị kết quả trên bảng điều khiển.

  1. Chấm dứt công việc: Chạy lệnh sau để chấm dứt công việc. Thay thế JOB_ID bằng mã nhận dạng mà chúng ta đã ghi chú trước đó
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

Xin chúc mừng! Bạn vừa tạo một quy trình CDC mạnh mẽ để ghi lại các thay đổi về cơ sở dữ liệu trong Pub/sub và xử lý các thay đổi đó bằng cách sử dụng luồng Spark chạy trong Cloud Dataproc.

7. Dọn dẹp

Hãy xoá mọi tài nguyên bạn đã tạo để không bị tính phí cho các tài nguyên đó trong tương lai. Cách dễ nhất để loại bỏ tính năng thanh toán là xoá dự án bạn đã tạo cho hướng dẫn này. Ngoài ra, bạn có thể xoá từng tài nguyên.

Chạy các lệnh sau để xoá từng tài nguyên

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics
delete database-changes --quiet
gcloud pubsub subscriptions
delete change-subscriber --quiet
gcloud iam service
-accounts delete <your-service-account-with-domain> --quiet

8. Xin chúc mừng

Xin chúc mừng! Bạn vừa hoàn thành một lớp học lập trình thực hành minh hoạ cách xây dựng một quy trình dữ liệu mạnh mẽ theo thời gian thực bằng Google Cloud Platform. Hãy cùng tóm tắt những gì bạn đã làm được:

  • Mô phỏng tính năng Thu thập dữ liệu thay đổi (CDC): Bạn đã tìm hiểu các kiến thức cơ bản về CDC và triển khai tập lệnh Python để mô phỏng các thay đổi đối với cơ sở dữ liệu, tạo ra các sự kiện thể hiện việc sửa đổi dữ liệu theo thời gian thực.
  • Tận dụng Cloud Pub/Sub: Bạn thiết lập các chủ đề và gói thuê bao Cloud Pub/Sub, cung cấp dịch vụ nhắn tin đáng tin cậy và có thể mở rộng để truyền phát các sự kiện CDC. Bạn đã phát hành các thay đổi mô phỏng cơ sở dữ liệu của mình lên Pub/Sub, tạo ra một luồng dữ liệu theo thời gian thực.
  • Dữ liệu đã xử lý bằng Dataproc và Spark: Bạn đã cấp phép một cụm Dataproc và triển khai một công việc Spark Streaming để sử dụng thông báo từ gói thuê bao Pub/Sub. Bạn đã xử lý và chuyển đổi các sự kiện CDC sắp tới theo thời gian thực, hiển thị kết quả trong nhật ký công việc Dataproc.
  • Tạo quy trình toàn diện theo thời gian thực: Bạn đã tích hợp thành công các dịch vụ này để tạo một quy trình dữ liệu hoàn chỉnh, giúp ghi lại, truyền trực tuyến và xử lý các thay đổi về dữ liệu theo thời gian thực. Bạn đã có kinh nghiệm thực tế trong việc xây dựng một hệ thống có thể xử lý các luồng dữ liệu liên tục.
  • Sử dụng Trình kết nối Spark Pub/Sub: Bạn đã định cấu hình thành công một cụm Dataproc để sử dụng trình kết nối Spark Pub/Sub. Đây là yếu tố quan trọng để Spark Structured Streaming đọc dữ liệu từ Pub/Sub.

Giờ đây, bạn đã có nền tảng vững chắc để xây dựng các quy trình dữ liệu phức tạp và tinh vi hơn cho nhiều ứng dụng, bao gồm cả phân tích theo thời gian thực, kho dữ liệu và cấu trúc dịch vụ vi mô. Hãy tiếp tục khám phá và xây dựng!

Tài liệu tham khảo