درباره این codelab
1. مقدمه
آخرین به روز رسانی: 2025-06-19
Change Data Capture چیست؟
Change Data Capture (CDC) مجموعه ای از الگوهای طراحی نرم افزار است که برای تعیین و ردیابی داده هایی که در یک پایگاه داده تغییر کرده اند استفاده می شود. به عبارت ساده تر، این روشی برای گرفتن و ثبت تغییرات ایجاد شده در داده ها است تا بتوان آن تغییرات را در سیستم های دیگر تکرار کرد.
Change Data Capture (CDC) در طیف وسیعی از سناریوهای مبتنی بر داده مانند مهاجرت داده، انبارداری و تجزیه و تحلیل دادهها در زمان واقعی، بازیابی فاجعه و دسترسی بالا، حسابرسی و انطباق و غیره بسیار مفید است.
مهاجرت داده ها
CDC پروژه های انتقال داده را با امکان انتقال تدریجی داده، کاهش زمان خرابی و به حداقل رساندن اختلال، ساده می کند.
انبارداری و تحلیل داده در زمان واقعی
CDC تضمین می کند که انبارهای داده و سیستم های تحلیلی به طور مداوم با آخرین تغییرات پایگاه داده های عملیاتی به روز می شوند.
این به کسب و کارها اجازه می دهد تا بر اساس اطلاعات لحظه ای تصمیم بگیرند.
بازیابی فاجعه و در دسترس بودن بالا
CDC امکان تکثیر بلادرنگ داده ها را در پایگاه های داده ثانویه برای اهداف بازیابی فاجعه فراهم می کند. در صورت خرابی، CDC امکان خرابی سریع به یک پایگاه داده ثانویه را فراهم می کند و زمان خرابی و از دست دادن داده را به حداقل می رساند.
حسابرسی و انطباق
CDC یک دنباله حسابرسی دقیق از تغییرات داده ها را ارائه می دهد که برای انطباق و الزامات قانونی ضروری است.
چیزی که خواهی ساخت
در این نرم افزار کد، شما می خواهید با استفاده از Cloud Pub/Sub، Dataproc، Python و Apache Spark یک خط لوله داده تغییر-گرفتن داده (CDC) بسازید. خط لوله شما:
- تغییرات پایگاه داده را شبیه سازی کنید و آنها را به عنوان رویداد در Cloud Pub/Sub، یک سرویس پیام رسانی مقیاس پذیر و قابل اعتماد منتشر کنید.
- از قدرت Dataproc، سرویس Spark و Hadoop مدیریت شده Google Cloud، برای پردازش این رویدادها در زمان واقعی استفاده کنید.
با اتصال این سرویسها، یک خط لوله قوی ایجاد میکنید که میتواند تغییرات دادهها را در زمان وقوع آنها ضبط و پردازش کند و پایهای برای تجزیه و تحلیل بلادرنگ، انبار دادهها و سایر برنامههای کاربردی حیاتی فراهم کند.
چیزی که یاد خواهید گرفت
- نحوه ایجاد یک خط لوله جمع آوری داده های تغییرات اساسی
- Dataproc برای پردازش جریان
- Cloud Pub/Sub برای پیام رسانی بلادرنگ
- اصول اولیه آپاچی اسپارک
این کد لبه روی Dataproc و Cloud Pub/Sub متمرکز شده است. مفاهیم غیر مرتبط و بلوکهای کد محو شدهاند و برای شما ارائه میشوند تا به سادگی کپی و جایگذاری کنید.
آنچه شما نیاز دارید
- یک حساب GCP فعال با راه اندازی پروژه. اگر ندارید، می توانید برای یک دوره آزمایشی رایگان ثبت نام کنید.
- gcloud CLI نصب و پیکربندی شد.
- Python 3.7+ برای شبیه سازی تغییرات پایگاه داده و تعامل با Pub/Sub نصب شده است.
- دانش پایه Dataproc، Cloud Pub/Sub، Apache Spark و Python.
قبل از شروع
برای فعال کردن API های مورد نیاز، دستور زیر را در ترمینال اجرا کنید:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
2. Cloud Pub/Sub را راه اندازی کنید
یک موضوع ایجاد کنید
از این موضوع برای انتشار تغییرات پایگاه داده استفاده خواهد شد. Dataproc job مصرف کننده این پیام ها خواهد بود و پیام ها را برای تغییر گرفتن داده ها پردازش می کند. اگر میخواهید درباره موضوعات بیشتر بدانید، میتوانید اسناد رسمی را اینجا بخوانید.
gcloud pubsub topics create database-changes
یک اشتراک ایجاد کنید
اشتراکی ایجاد کنید که برای مصرف پیام ها در Pub/Sub استفاده می شود. برای اطلاعات بیشتر در مورد اشتراکها، میتوانید اسناد رسمی را اینجا بخوانید.
gcloud pubsub subscriptions create --topic database-changes change-subscriber
3. شبیه سازی تغییرات پایگاه داده
مراحل
- یک اسکریپت پایتون (به عنوان مثال
simulate_cdc.py
) برای شبیه سازی تغییرات پایگاه داده و انتشار آنها در Pub/Sub ایجاد کنید.
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
شناسه پروژه GCP واقعی خود را جایگزین YOUR_PROJECT_ID کنید
- کتابخانه Pub/Sub Client را نصب کنید:
pip install google-cloud-pubsub
- اسکریپت را روی ترمینال خود اجرا کنید. این اسکریپت به طور مداوم اجرا می شود و هر 2 ثانیه یک بار پیام هایی را به موضوع Pub/Sub منتشر می کند.
python simulate_cdc.py
- پس از اجرای اسکریپت برای فرض کنید 1 دقیقه، پیام های کافی در Pub/Sub برای مصرف خواهید داشت. بسته به سیستم عامل خود می توانید اسکریپت پایتون در حال اجرا را با فشار دادن ctrl + C یا Cmd + C خاتمه دهید.
- مشاهده پیام های منتشر شده:
ترمینال دیگری را باز کنید و دستور زیر را برای مشاهده پیام های منتشر شده اجرا کنید:
gcloud pubsub subscriptions pull --auto-ack change-subscriber
شما باید یک ردیف جدول حاوی پیام و سایر فیلدها را ببینید:
{"change_type": "UPDATE"، "record_id": 10، "timestamp": 1742466264.888465، "data": {"field1": "value1"، "field2": "value2"}}
توضیح
- اسکریپت پایتون با ایجاد تصادفی رویدادهای
INSERT
،UPDATE
یاDELETE
، تغییرات پایگاه داده را شبیه سازی می کند. - هر تغییر به عنوان یک شی JSON نشان داده می شود که شامل نوع تغییر، شناسه رکورد، مهر زمانی و داده است.
- این اسکریپت از کتابخانه سرویس گیرنده Cloud Pub/Sub برای انتشار این رویدادهای تغییر در مبحث
database-changes
استفاده می کند. - دستور subscriber به شما امکان می دهد پیام هایی را که به موضوع pub/sub ارسال می شوند را مشاهده کنید.
4. یک حساب سرویس برای Dataproc ایجاد کنید
در این بخش، یک حساب سرویس ایجاد می کنید که خوشه Dataproc می تواند از آن استفاده کند. شما همچنین مجوزهای لازم را برای اجازه به نمونه های کلاستر برای دسترسی به Cloud Pub/sub و Dataproc اختصاص می دهید.
- ایجاد یک حساب خدمات:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- نقش کارمند Dataproc را اضافه کنید تا به حساب سرویس اجازه دهید خوشهها ایجاد کند و کارها را اجرا کند. شناسه حساب سرویس تولید شده در دستور قبلی را به عنوان عضو در دستور زیر اضافه کنید:
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- نقش مشترک Pub/sub را اضافه کنید تا به حساب سرویس اجازه دهید در اشتراک Pub/sub "تغییر-مشترک" مشترک شود:
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
5. یک خوشه Dataproc ایجاد کنید
خوشه Dataproc برنامه spark را اجرا می کند که پیام ها را در Pub/sub پردازش می کند. شما به حساب سرویس ایجاد شده در بخش قبلی نیاز دارید. Dataproc این حساب سرویس را به هر نمونه در خوشه اختصاص می دهد تا همه نمونه ها مجوزهای صحیح را برای اجرای برنامه دریافت کنند.
برای ایجاد یک کلاستر Dataproc از دستور زیر استفاده کنید:
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
6. Spark Job را به Dataproc Cluster ارسال کنید
برنامه استریم جرقه پیامهای تغییر پایگاه داده را در Pub/sub پردازش میکند و آنها را در کنسول چاپ میکند.
مراحل
- یک دایرکتوری ایجاد کنید و کد منبع مصرف کننده را به فایل PubsubConsumer.scala اضافه کنید
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- موارد زیر را ایجاد کرده و به pom.xml اضافه کنید
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- به دایرکتوری spark پروژه تغییر دهید و مسیر را در یک متغیر محیطی ذخیره کنید تا بعدا استفاده شود:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- دایرکتوری را تغییر دهید:
cd $REPO_ROOT/spark
- جاوا 1.8 را دانلود کنید و پوشه را در /usr/lib/jvm/ قرار دهید. سپس JAVA_HOME خود را برای اشاره به این تغییر دهید:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- jar برنامه را بسازید
mvn clean package
بایگانی spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar حاوی کد برنامه و وابستگی ها در دایرکتوری spark/target
ایجاد شده است.
- درخواست اسپارک را ارسال کنید:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- لیست مشاغل فعال را نمایش دهید و مقدار
JOB_ID
را برای آن شغل یادداشت کنید:
gcloud dataproc jobs list --region=us-central1 --state-filter=active
خروجی شبیه به
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- با باز کردن URL زیر در مرورگر خود، خروجی کار را مشاهده کنید. [JOB_ID] را با مقدار ذکر شده در مرحله قبل جایگزین کنید.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- خروجی مشابه موارد زیر است:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
کار استریم جرقه ای که در Dataproc اجرا می شود، پیام ها را از Pub/sub می کشد، آنها را پردازش می کند و خروجی را به کنسول نمایش می دهد.
- خاتمه کار: دستور زیر را برای خاتمه کار اجرا کنید. جایگزینی JOB_ID با همان موردی است که قبلاً اشاره کردیم
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
تبریک میگم شما به تازگی یک خط لوله CDC قدرتمند ایجاد کرده اید که تغییرات پایگاه داده را در Pub/sub ضبط می کند و آنها را با استفاده از جریان جرقه در حال اجرا در Cloud Dataproc پردازش می کند.
7. پاک کن
منابعی را که ایجاد کرده اید پاک کنید تا در آینده برای آن ها صورتحساب دریافت نکنید. ساده ترین راه برای حذف صورتحساب، حذف پروژه ای است که برای آموزش ایجاد کرده اید. از طرف دیگر، می توانید منابع فردی را حذف کنید.
دستورات زیر را برای حذف منابع فردی اجرا کنید
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
8. تبریک میگم
تبریک میگوییم، شما بهتازگی یک کد آزمایشگاهی عملی را تکمیل کردهاید که نشان میدهد چگونه میتوان یک خط لوله داده در زمان واقعی با استفاده از Google Cloud Platform ایجاد کرد. بیایید آنچه را که به دست آوردهاید خلاصه کنیم:
- شبیهسازی تغییر دادهها (CDC): شما اصول CDC را یاد گرفتید و یک اسکریپت پایتون را برای شبیهسازی تغییرات پایگاهداده پیادهسازی کردید، و رویدادهایی را ایجاد کرد که نشاندهنده تغییرات دادهها در زمان واقعی است.
- Leveraged Cloud Pub/Sub: موضوعات و اشتراکهای Cloud Pub/Sub را تنظیم میکنید و یک سرویس پیامرسانی مقیاسپذیر و قابل اعتماد برای پخش رویدادهای CDC خود ارائه میکنید. شما تغییرات پایگاه داده شبیه سازی شده خود را به Pub/Sub منتشر کردید و یک جریان داده در زمان واقعی ایجاد کردید.
- داده های پردازش شده با Dataproc و Spark: شما یک خوشه Dataproc تهیه کردید و یک کار جریانی Spark را برای مصرف پیام های اشتراک Pub/Sub خود به کار بردید. شما رویدادهای CDC ورودی را در زمان واقعی پردازش و تبدیل کردید، و نتایج را در گزارشهای کار Dataproc خود نمایش میدهید.
- ساخت یک خط لوله زمان واقعی از انتها به انتها: شما با موفقیت این خدمات را برای ایجاد یک خط لوله داده کامل که تغییرات داده را در زمان واقعی ضبط، جریان و پردازش می کند، ادغام کردید. شما تجربه عملی در ساختن سیستمی به دست آوردید که بتواند جریان های داده پیوسته را مدیریت کند.
- از اتصال دهنده Spark Pub/Sub استفاده کردید: شما با موفقیت یک خوشه Dataproc را برای استفاده از اتصال Spark Pub/Sub پیکربندی کردید، که برای Spark Structured Streaming برای خواندن داده ها از Pub/Sub بسیار مهم است.
اکنون شما یک پایه محکم برای ایجاد خطوط لوله داده پیچیده تر و پیچیده تر برای برنامه های مختلف، از جمله تجزیه و تحلیل بلادرنگ، انبار داده ها، و معماری های میکروسرویس دارید. به کاوش و ساختن ادامه دهید!