เกี่ยวกับ Codelab นี้
1 บทนำ
อัปเดตล่าสุด: 19-06-2025
การบันทึกการเปลี่ยนแปลงข้อมูลคืออะไร
การบันทึกการเปลี่ยนแปลงข้อมูล (CDC) คือชุดรูปแบบการออกแบบซอฟต์แวร์ที่ใช้เพื่อระบุและติดตามข้อมูลที่เปลี่ยนแปลงในฐานข้อมูล กล่าวอย่างง่ายคือ เป็นวิธีบันทึกและบันทึกการเปลี่ยนแปลงที่ทำกับข้อมูลเพื่อให้ทำซ้ำการเปลี่ยนแปลงเหล่านั้นในระบบอื่นๆ ได้
Change Data Capture (CDC) มีประโยชน์อย่างยิ่งในสถานการณ์ต่างๆ ที่ขับเคลื่อนโดยข้อมูล เช่น การย้ายข้อมูล พื้นที่เก็บข้อมูลและการวิเคราะห์แบบเรียลไทม์ การกู้คืนภัยพิบัติและความพร้อมใช้งานสูง การตรวจสอบและการปฏิบัติตามข้อกำหนด ฯลฯ
การย้ายข้อมูล
CDC ช่วยให้โปรเจ็กต์การย้ายข้อมูลง่ายขึ้นด้วยการอนุญาตให้โอนข้อมูลแบบเพิ่มทีละน้อย ซึ่งจะช่วยลดเวลาหยุดทำงานและลดการหยุดชะงัก
คลังข้อมูลและข้อมูลวิเคราะห์แบบเรียลไทม์
CDC ช่วยให้มั่นใจว่าระบบคลังข้อมูลและระบบวิเคราะห์ได้รับการอัปเดตการเปลี่ยนแปลงล่าสุดจากฐานข้อมูลการดําเนินการอย่างต่อเนื่อง
ซึ่งช่วยให้ธุรกิจตัดสินใจได้โดยใช้ข้อมูลแบบเรียลไทม์
การกู้คืนข้อมูลหลังจากภัยพิบัติและความพร้อมใช้งานสูง
CDC ช่วยให้สามารถจำลองข้อมูลไปยังฐานข้อมูลรองแบบเรียลไทม์เพื่อวัตถุประสงค์ในการกู้คืนข้อมูลหลังจากเกิดภัยพิบัติ ในกรณีที่เกิดข้อผิดพลาด CDC จะช่วยให้ระบบเปลี่ยนเส้นทางไปยังฐานข้อมูลรองได้อย่างรวดเร็ว ซึ่งจะช่วยลดเวลาหยุดทำงานและการสูญเสียข้อมูล
การตรวจสอบและการปฏิบัติตามข้อกำหนด
CDC มีร่องรอยการตรวจสอบการเปลี่ยนแปลงข้อมูลโดยละเอียด ซึ่งจำเป็นต่อการปฏิบัติตามข้อกำหนดและข้อบังคับ
สิ่งที่คุณจะสร้าง
ในโค้ดแล็บนี้ คุณจะได้สร้างไปป์ไลน์ข้อมูลการบันทึกข้อมูลการเปลี่ยนแปลง (CDC) โดยใช้ Cloud Pub/Sub, Dataproc, Python และ Apache Spark ไปป์ไลน์จะทําดังนี้
- จำลองการเปลี่ยนแปลงฐานข้อมูลและเผยแพร่เป็นเหตุการณ์ไปยัง Cloud Pub/Sub ซึ่งเป็นบริการรับส่งข้อความที่ปรับขนาดได้และเชื่อถือได้
- ใช้ประโยชน์จากความสามารถของ Dataproc ซึ่งเป็นบริการ Spark และ Hadoop ที่มีการจัดการของ Google Cloud เพื่อประมวลผลเหตุการณ์เหล่านี้แบบเรียลไทม์
การเชื่อมต่อบริการเหล่านี้จะสร้างไปป์ไลน์ที่มีประสิทธิภาพซึ่งสามารถบันทึกและประมวลผลการเปลี่ยนแปลงข้อมูลที่เกิดขึ้นได้ ซึ่งจะเป็นรากฐานสําหรับการวิเคราะห์แบบเรียลไทม์ พื้นที่เก็บข้อมูล และแอปพลิเคชันอื่นๆ ที่สําคัญ
สิ่งที่คุณจะได้เรียนรู้
- วิธีสร้างไปป์ไลน์การบันทึกข้อมูลการเปลี่ยนแปลงพื้นฐาน
- Dataproc สำหรับการประมวลผลสตรีม
- Cloud Pub/Sub สําหรับการรับส่งข้อความแบบเรียลไทม์
- ข้อมูลเบื้องต้นเกี่ยวกับ Apache Spark
โค้ดแล็บนี้จะเน้นที่ Dataproc และ Cloud Pub/Sub แนวคิดและบล็อกโค้ดที่ไม่เกี่ยวข้องจะได้รับการอธิบายอย่างคร่าวๆ และให้คุณคัดลอกและวางได้ง่ายๆ
สิ่งที่ต้องมี
- บัญชี GCP ที่ใช้งานอยู่ซึ่งมีการตั้งค่าโปรเจ็กต์ หากยังไม่มีบัญชี คุณสามารถลงชื่อสมัครใช้ช่วงทดลองใช้ฟรีได้
- ติดตั้งและกำหนดค่า gcloud CLI แล้ว
- ติดตั้ง Python 3.7 ขึ้นไปเพื่อจำลองการเปลี่ยนแปลงฐานข้อมูลและการโต้ตอบกับ Pub/Sub
- ความรู้พื้นฐานเกี่ยวกับ Dataproc, Cloud Pub/Sub, Apache Spark และ Python
ก่อนจะเริ่มต้น
เรียกใช้คําสั่งต่อไปนี้ในเทอร์มินัลเพื่อเปิดใช้ API ที่จําเป็น
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
2 ตั้งค่า Cloud Pub/Sub
สร้างหัวข้อ
ระบบจะใช้หัวข้อนี้เพื่อเผยแพร่การเปลี่ยนแปลงฐานข้อมูล งาน Dataproc จะเป็นผู้ใช้ข้อความเหล่านี้และจะประมวลผลข้อความสําหรับการเก็บรวบรวมข้อมูลการเปลี่ยนแปลง หากต้องการดูข้อมูลเพิ่มเติมเกี่ยวกับ Topics โปรดอ่านเอกสารประกอบอย่างเป็นทางการที่นี่
gcloud pubsub topics create database-changes
สร้างการสมัครใช้บริการ
สร้างการสมัครใช้บริการที่จะใช้รับข้อความใน Pub/Sub หากต้องการดูข้อมูลเพิ่มเติมเกี่ยวกับการติดตาม โปรดอ่านเอกสารประกอบอย่างเป็นทางการที่นี่
gcloud pubsub subscriptions create --topic database-changes change-subscriber
3 จำลองการเปลี่ยนแปลงฐานข้อมูล
ขั้นตอน
- สร้างสคริปต์ Python (เช่น
simulate_cdc.py
) เพื่อจำลองการเปลี่ยนแปลงฐานข้อมูลและเผยแพร่ไปยัง Pub/Sub
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
แทนที่ YOUR_PROJECT_ID ด้วยรหัสโปรเจ็กต์ GCP จริง
- ติดตั้งไลบรารีไคลเอ็นต์ Pub/Sub
pip install google-cloud-pubsub
- เรียกใช้สคริปต์ในเทอร์มินัล สคริปต์นี้จะทํางานอย่างต่อเนื่องและเผยแพร่ข้อความไปยังหัวข้อ Pub/Sub ทุก 2 วินาที
python simulate_cdc.py
- หลังจากเรียกใช้สคริปต์เป็นเวลาประมาณ 1 นาที คุณจะมีข้อความใน Pub/Sub เพียงพอที่จะใช้ คุณสิ้นสุดสคริปต์ Python ที่ทำงานอยู่ได้โดยกด Ctrl + C หรือ Cmd + C ทั้งนี้ขึ้นอยู่กับระบบปฏิบัติการ
- ดูข้อความที่เผยแพร่แล้ว
เปิดเทอร์มินัลอีกเครื่องหนึ่งแล้วเรียกใช้คำสั่งต่อไปนี้เพื่อดูข้อความที่เผยแพร่
gcloud pubsub subscriptions pull --auto-ack change-subscriber
คุณควรเห็นแถวตารางที่มีข้อความและช่องอื่นๆ ดังนี้
{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}
คำอธิบาย
- สคริปต์ Python จะจำลองการเปลี่ยนแปลงฐานข้อมูลโดยการสุ่มสร้างเหตุการณ์
INSERT
,UPDATE
หรือDELETE
- การเปลี่ยนแปลงแต่ละรายการจะแสดงเป็นออบเจ็กต์ JSON ที่มีประเภทการเปลี่ยนแปลง รหัสระเบียน การประทับเวลา และข้อมูล
- สคริปต์ใช้คลังไลบรารีไคลเอ็นต์ Cloud Pub/Sub เพื่อเผยแพร่เหตุการณ์การเปลี่ยนแปลงเหล่านี้ไปยังหัวข้อ
database-changes
- คำสั่งผู้สมัครรับข้อมูลช่วยให้คุณดูข้อความที่ส่งไปยังหัวข้อ Pub/Sub ได้
4 สร้างบัญชีบริการสำหรับ Dataproc
ในส่วนนี้ คุณสร้างบัญชีบริการที่คลัสเตอร์ Dataproc ใช้ได้ นอกจากนี้ คุณยังกำหนดสิทธิ์ที่จำเป็นเพื่ออนุญาตให้อินสแตนซ์คลัสเตอร์เข้าถึง Cloud Pub/Sub และ Dataproc ได้ด้วย
- วิธีสร้างบัญชีบริการ
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- เพิ่มบทบาทผู้ปฏิบัติงาน Dataproc เพื่ออนุญาตให้บัญชีบริการสร้างคลัสเตอร์และเรียกใช้งาน เพิ่มรหัสบัญชีบริการที่สร้างขึ้นจากคําสั่งก่อนหน้าเป็นสมาชิกในคําสั่งด้านล่าง
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- เพิ่มบทบาทผู้สมัครใช้บริการ Pub/Sub เพื่ออนุญาตให้บัญชีบริการสมัครใช้บริการ "change-subscriber" ของ Pub/Sub
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
5 สร้างคลัสเตอร์ Dataproc
คลัสเตอร์ Dataproc จะเรียกใช้แอป Spark ซึ่งจะประมวลผลข้อความใน Pub/Sub คุณจะต้องมีบัญชีบริการที่สร้างไว้ในส่วนก่อนหน้า Dataproc จะกำหนดบัญชีบริการนี้ให้กับทุกอินสแตนซ์ในคลัสเตอร์เพื่อให้อินสแตนซ์ทั้งหมดได้รับสิทธิ์ที่ถูกต้องในการเรียกใช้แอป
ใช้คําสั่งต่อไปนี้เพื่อสร้างคลัสเตอร์ Dataproc
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
6 ส่งงาน Spark ไปยังคลัสเตอร์ Dataproc
แอปสตรีมมิง Spark จะประมวลผลข้อความการเปลี่ยนแปลงฐานข้อมูลใน Pub/Sub และพิมพ์ไปยังคอนโซล
ขั้นตอน
- สร้างไดเรกทอรีและเพิ่มซอร์สโค้ดของผู้บริโภคลงในไฟล์ PubsubConsumer.scala
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- สร้างและเพิ่มข้อมูลต่อไปนี้ลงใน pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- เปลี่ยนเป็นไดเรกทอรี Spark ของโปรเจ็กต์และบันทึกเส้นทางไว้ในตัวแปรสภาพแวดล้อมเพื่อใช้ในภายหลัง
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- เปลี่ยนไดเรกทอรี
cd $REPO_ROOT/spark
- ดาวน์โหลด Java 1.8 และวางโฟลเดอร์ใน /usr/lib/jvm/ จากนั้นเปลี่ยน JAVA_HOME ให้ชี้ไปยังตำแหน่งนี้
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- สร้างไฟล์ jar ของแอปพลิเคชัน
mvn clean package
ระบบจะสร้างไฟล์เก็บถาวร spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar ที่มีโค้ดแอปพลิเคชันและไลบรารีที่เกี่ยวข้องในไดเรกทอรี spark/target
- ส่งใบสมัคร Spark โดยทำดังนี้
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- แสดงรายการงานที่ใช้งานอยู่และจดค่า
JOB_ID
ของงาน
gcloud dataproc jobs list --region=us-central1 --state-filter=active
เอาต์พุตจะมีลักษณะคล้ายกับ
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- ดูเอาต์พุตของงานโดยเปิด URL ต่อไปนี้ในเบราว์เซอร์ แทนที่ [JOB_ID] ด้วยค่าที่บันทึกไว้ในขั้นตอนก่อนหน้า
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- เอาต์พุตจะมีลักษณะคล้ายกับตัวอย่างต่อไปนี้
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
งานสตรีมมิง Spark ที่ทำงานใน Dataproc จะดึงข้อความจาก Pub/Sub ประมวลผล และแสดงผลลัพธ์ไปยังคอนโซล
- การสิ้นสุดงาน: เรียกใช้คําสั่งต่อไปนี้เพื่อสิ้นสุดงาน แทนที่ JOB_ID ด้วยรหัสเดียวกับที่เราจดไว้ก่อนหน้านี้
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
ยินดีด้วย คุณเพิ่งสร้างไปป์ไลน์ CDC ที่มีประสิทธิภาพซึ่งบันทึกการเปลี่ยนแปลงฐานข้อมูลใน Pub/Sub และประมวลผลโดยใช้ Spark Streaming ที่ทำงานใน Cloud Dataproc
7 ล้างข้อมูล
ล้างข้อมูลทรัพยากรที่คุณสร้างขึ้นเพื่อไม่ให้ระบบเรียกเก็บเงินจากคุณในอนาคต วิธีที่ง่ายที่สุดในการหยุดการเรียกเก็บเงินคือการลบโปรเจ็กต์ที่คุณสร้างสำหรับบทแนะนำ หรือจะลบทรัพยากรแต่ละรายการก็ได้
เรียกใช้คำสั่งต่อไปนี้เพื่อลบทรัพยากรแต่ละรายการ
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
8 ขอแสดงความยินดี
ยินดีด้วย คุณเพิ่งทํา Codelab แบบลงมือปฏิบัติซึ่งสาธิตวิธีสร้างไปป์ไลน์ข้อมูลที่มีประสิทธิภาพแบบเรียลไทม์โดยใช้ Google Cloud Platform มาสรุปสิ่งที่คุณทําสําเร็จกัน
- การจําลอง Change Data Capture (CDC): คุณได้เรียนรู้พื้นฐานของ CDC และติดตั้งสคริปต์ Python เพื่อจําลองการเปลี่ยนแปลงฐานข้อมูล ซึ่งจะสร้างเหตุการณ์ที่แสดงการแก้ไขข้อมูลแบบเรียลไทม์
- ใช้ประโยชน์จาก Cloud Pub/Sub: คุณตั้งค่าหัวข้อและการสมัครใช้บริการ Cloud Pub/Sub ซึ่งให้บริการรับส่งข้อความที่ปรับขนาดได้และเชื่อถือได้สำหรับการสตรีมเหตุการณ์ CDC คุณได้เผยแพร่การเปลี่ยนแปลงฐานข้อมูลที่จําลองไปยัง Pub/Sub ซึ่งจะสร้างสตรีมข้อมูลแบบเรียลไทม์
- ข้อมูลที่ประมวลผลแล้วด้วย Dataproc และ Spark: คุณได้จัดสรรคลัสเตอร์ Dataproc และทำให้งาน Spark Streaming ใช้งานได้เพื่อรับข้อความจากการสมัครใช้บริการ Pub/Sub คุณได้ประมวลผลและเปลี่ยนรูปแบบเหตุการณ์ CDC ที่เข้ามาแบบเรียลไทม์ ซึ่งแสดงผลลัพธ์ในบันทึกงานของ Dataproc
- สร้างไปป์ไลน์แบบเรียลไทม์จากต้นทางถึงปลายทาง: คุณผสานรวมบริการเหล่านี้เพื่อสร้างไปป์ไลน์ข้อมูลที่สมบูรณ์ซึ่งบันทึก สตรีม และประมวลผลการเปลี่ยนแปลงข้อมูลแบบเรียลไทม์เรียบร้อยแล้ว คุณได้รับประสบการณ์จริงในการสร้างระบบที่จัดการสตรีมข้อมูลอย่างต่อเนื่องได้
- ใช้เครื่องมือเชื่อมต่อ Spark Pub/Sub: คุณกําหนดค่าคลัสเตอร์ Dataproc ให้ใช้เครื่องมือเชื่อมต่อ Spark Pub/Sub เรียบร้อยแล้ว ซึ่งสําคัญต่อ Structured Streaming ของ Spark ในการอ่านข้อมูลจาก Pub/Sub
ตอนนี้คุณมีรากฐานที่มั่นคงสําหรับการสร้างไปป์ไลน์ข้อมูลที่ซับซ้อนและล้ำสมัยมากขึ้นสําหรับแอปพลิเคชันต่างๆ ซึ่งรวมถึงการวิเคราะห์แบบเรียลไทม์ พื้นที่เก็บข้อมูล และสถาปัตยกรรมไมโครเซอร์วิส สำรวจและสร้างต่อไป