۱. مقدمه

آخرین بهروزرسانی: 2025-06-19
ثبت دادههای تغییر چیست؟
ثبت تغییرات دادهها (CDC) مجموعهای از الگوهای طراحی نرمافزار است که برای تعیین و ردیابی دادههایی که در یک پایگاه داده تغییر کردهاند، استفاده میشود. به عبارت سادهتر، این روشی برای ثبت و ضبط تغییرات ایجاد شده در دادهها است تا بتوان این تغییرات را در سیستمهای دیگر تکرار کرد.
ثبت دادههای تغییر (CDC) در طیف گستردهای از سناریوهای دادهمحور مانند مهاجرت دادهها، انبارداری و تجزیه و تحلیل دادههای بلادرنگ، بازیابی اطلاعات و دسترسیپذیری بالا، حسابرسی و انطباق و غیره فوقالعاده مفید است.
مهاجرت داده
CDC با فراهم کردن امکان انتقال تدریجی دادهها، کاهش زمان از کارافتادگی و به حداقل رساندن اختلال، پروژههای مهاجرت دادهها را ساده میکند.
انبارداری و تحلیل دادههای بلادرنگ
CDC تضمین میکند که انبارهای داده و سیستمهای تحلیلی دائماً با آخرین تغییرات پایگاههای داده عملیاتی بهروز میشوند.
این امر به کسبوکارها اجازه میدهد تا بر اساس اطلاعات بلادرنگ تصمیمگیری کنند.
بازیابی پس از سانحه و دسترسیپذیری بالا
CDC امکان کپی کردن دادهها به صورت بلادرنگ (Real-time) را در پایگاههای داده ثانویه برای اهداف بازیابی پس از فاجعه فراهم میکند. در صورت بروز خرابی، CDC امکان انتقال سریع به پایگاه داده ثانویه را فراهم میکند و زمان از کار افتادگی و از دست رفتن دادهها را به حداقل میرساند.
حسابرسی و انطباق
CDC یک مسیر حسابرسی دقیق از تغییرات دادهها ارائه میدهد که برای انطباق با الزامات قانونی و نظارتی ضروری است.
آنچه خواهید ساخت
در این آزمایشگاه کد، شما قصد دارید با استفاده از Cloud Pub/Sub، Dataproc، Python و Apache Spark یک خط لوله داده تغییر-داده-ضبط (CDC) بسازید. خط لوله شما:
- تغییرات پایگاه داده را شبیهسازی کنید و آنها را به عنوان رویدادها در Cloud Pub/Sub، یک سرویس پیامرسانی مقیاسپذیر و قابل اعتماد، منتشر کنید.
- از قدرت Dataproc، سرویس مدیریتشدهی Spark و Hadoop گوگل کلود، برای پردازش این رویدادها به صورت بلادرنگ بهره ببرید.
با اتصال این سرویسها، شما یک خط لوله قوی ایجاد خواهید کرد که قادر به ثبت و پردازش تغییرات دادهها در حین وقوع است و پایه و اساسی را برای تجزیه و تحلیل بلادرنگ، انبار دادهها و سایر برنامههای حیاتی فراهم میکند.
آنچه یاد خواهید گرفت
- نحوه ایجاد یک خط لوله ثبت دادههای تغییر اساسی
- Dataproc برای پردازش جریانی
- Cloud Pub/Sub برای پیامرسانی بلادرنگ
- مبانی آپاچی اسپارک
این آزمایشگاه کد بر روی Dataproc و Cloud Pub/Sub تمرکز دارد. مفاهیم و بلوکهای کد نامربوط نادیده گرفته شدهاند و برای کپی و پیست ساده در اختیار شما قرار گرفتهاند.
آنچه نیاز دارید
- یک حساب کاربری GCP فعال با پروژه راهاندازی شده. اگر حساب کاربری ندارید، میتوانید برای یک دوره آزمایشی رایگان ثبتنام کنید.
- رابط خط فرمان gcloud نصب و پیکربندی شد.
- پایتون ۳.۷+ برای شبیهسازی تغییرات پایگاه داده و تعامل با Pub/Sub نصب شده باشد.
- آشنایی اولیه با Dataproc، Cloud Pub/Sub، Apache Spark و پایتون.
قبل از شروع
برای فعال کردن API های مورد نیاز، دستور زیر را در ترمینال اجرا کنید:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
۲. راهاندازی Cloud Pub/Sub
ایجاد یک موضوع
این تاپیک برای انتشار تغییرات پایگاه داده استفاده خواهد شد. وظیفه Dataproc مصرفکننده این پیامها خواهد بود و پیامها را برای ثبت دادههای تغییر پردازش میکند. اگر میخواهید درباره تاپیکها بیشتر بدانید، میتوانید مستندات رسمی را اینجا بخوانید.
gcloud pubsub topics create database-changes
ایجاد اشتراک
یک اشتراک ایجاد کنید که برای استفاده از پیامها در Pub/Sub استفاده خواهد شد. برای کسب اطلاعات بیشتر در مورد اشتراکها، میتوانید مستندات رسمی را اینجا بخوانید.
gcloud pubsub subscriptions create --topic database-changes change-subscriber
۳. شبیهسازی تغییرات پایگاه داده
مراحل
- یک اسکریپت پایتون (مثلاً
simulate_cdc.py) ایجاد کنید تا تغییرات پایگاه داده را شبیهسازی کرده و آنها را در Pub/Sub منتشر کند.
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
YOUR_PROJECT_ID را با شناسه پروژه GCP واقعی خود جایگزین کنید.
- کتابخانه کلاینت Pub/Sub را نصب کنید:
pip install google-cloud-pubsub
- اسکریپت را در ترمینال خود اجرا کنید. این اسکریپت به طور مداوم اجرا میشود و هر ۲ ثانیه یک بار پیامهایی را در تاپیک Pub/Sub منتشر میکند.
python simulate_cdc.py
- بعد از اجرای اسکریپت به مدت مثلاً ۱ دقیقه، به اندازه کافی پیام در Pub/Sub برای مصرف خواهید داشت. میتوانید بسته به سیستم عامل خود، با فشار دادن کلیدهای ctrl + C یا Cmd + C، اجرای اسکریپت پایتون را خاتمه دهید.
- مشاهده پیامهای منتشر شده:
یک ترمینال دیگر باز کنید و دستور زیر را برای مشاهده پیامهای منتشر شده اجرا کنید:
gcloud pubsub subscriptions pull --auto-ack change-subscriber
شما باید یک ردیف جدول حاوی پیام و فیلدهای دیگر را ببینید:
{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}
توضیح
- اسکریپت پایتون با تولید تصادفی رویدادهای
INSERT،UPDATEیاDELETE، تغییرات پایگاه داده را شبیهسازی میکند. - هر تغییر به صورت یک شیء JSON نمایش داده میشود که شامل نوع تغییر، شناسه رکورد، مهر زمانی و دادهها است.
- این اسکریپت از کتابخانه Cloud Pub/Sub client برای انتشار این رویدادهای تغییر در موضوع
database-changesاستفاده میکند. - دستور subscriber به شما امکان میدهد پیامهایی را که به موضوع pub/sub ارسال میشوند، مشاهده کنید.
۴. یک حساب کاربری سرویس برای Dataproc ایجاد کنید
در این بخش، شما یک حساب کاربری Service ایجاد میکنید که کلاستر Dataproc میتواند از آن استفاده کند. همچنین مجوزهای لازم را برای دسترسی نمونههای کلاستر به Cloud Pub/sub و Dataproc اختصاص میدهید.
- ایجاد حساب کاربری سرویس:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- نقش کارگر Dataproc را اضافه کنید تا به حساب سرویس اجازه ایجاد خوشهها و اجرای کارها را بدهید. شناسه حساب سرویس تولید شده در دستور قبلی را به عنوان عضو در دستور زیر اضافه کنید:
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- نقش Pub/sub subscriber را اضافه کنید تا به حساب سرویس اجازه دهید در اشتراک Pub/sub از نوع "change-subscriber" مشترک شود:
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
۵. ایجاد یک کلاستر Dataproc
کلاستر Dataproc برنامهی spark را اجرا خواهد کرد که پیامها را در Pub/sub پردازش خواهد کرد. شما به حساب کاربری سرویس ایجاد شده در بخش قبل نیاز دارید. Dataproc این حساب کاربری سرویس را به هر نمونه در کلاستر اختصاص میدهد تا تمام نمونهها مجوزهای صحیح برای اجرای برنامه را دریافت کنند.
برای ایجاد یک کلاستر Dataproc از دستور زیر استفاده کنید:
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
۶. ارسال کار Spark به خوشه Dataproc
برنامهی استریمینگ اسپارک، پیامهای تغییر پایگاه داده را در Pub/sub پردازش کرده و آنها را در کنسول چاپ میکند.
مراحل
- یک دایرکتوری ایجاد کنید و کد منبع مصرفکننده را به فایل PubsubConsumer.scala اضافه کنید.
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- فایل زیر را ایجاد و به pom.xml اضافه کنید.
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- به دایرکتوری spark پروژه بروید و مسیر را در یک متغیر محیطی ذخیره کنید تا بعداً از آن استفاده شود:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- تغییر دایرکتوری:
cd $REPO_ROOT/spark
- جاوا ۱.۸ را دانلود کنید و پوشه را در /usr/lib/jvm/ قرار دهید. سپس JAVA_HOME خود را به این تغییر دهید:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- ساخت فایل jar برنامه
mvn clean package
فایل آرشیو spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar که شامل کد برنامه و وابستگیهای آن است، در دایرکتوری spark/target ایجاد میشود.
- درخواست اسپارک را ارسال کنید:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- لیست مشاغل فعال را نمایش داده و مقدار
JOB_IDرا برای شغل یادداشت کنید:
gcloud dataproc jobs list --region=us-central1 --state-filter=active
خروجی شبیه به این خواهد بود:
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- با باز کردن URL زیر در مرورگر خود، خروجی کار را مشاهده کنید. [JOB_ID] را با مقداری که در مرحله قبل مشخص شده است، جایگزین کنید.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- خروجی مشابه زیر است:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
کار استریمینگ اسپارک که در Dataproc اجرا میشود، پیامها را از Pub/sub دریافت میکند، آنها را پردازش میکند و خروجی را در کنسول نمایش میدهد.
- خاتمه دادن به کار: دستور زیر را برای خاتمه دادن به کار اجرا کنید. JOB_ID را با همان موردی که قبلاً یادداشت کردیم جایگزین کنید.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
تبریک! شما به تازگی یک خط لوله CDC قدرتمند ایجاد کردهاید که تغییرات پایگاه داده را در Pub/sub ثبت کرده و آنها را با استفاده از جریان جرقهای که در Cloud Dataproc اجرا میشود، پردازش میکند.
۷. تمیز کردن
منابعی را که ایجاد کردهاید، پاک کنید تا در آینده بابت آنها هزینهای از شما دریافت نشود. سادهترین راه برای حذف هزینه، حذف پروژهای است که برای آموزش ایجاد کردهاید. به عنوان یک روش جایگزین، میتوانید منابع را به صورت جداگانه حذف کنید.
برای حذف منابع تکی، دستورات زیر را اجرا کنید
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
۸. تبریک
تبریک میگویم، شما به تازگی یک آزمایشگاه کد عملی را به پایان رساندهاید که نحوه ساخت یک خط لوله داده بلادرنگ قوی با استفاده از پلتفرم ابری گوگل را نشان میدهد. بیایید خلاصهای از آنچه انجام دادهاید را مرور کنیم:
- ثبت دادههای تغییر شبیهسازیشده (CDC): شما اصول اولیه CDC را یاد گرفتید و یک اسکریپت پایتون برای شبیهسازی تغییرات پایگاه داده پیادهسازی کردید و رویدادهایی را ایجاد کردید که نشاندهنده تغییرات دادههای بلادرنگ هستند.
- انتشار/ذخیره ابری قدرتمند: شما موضوعات و اشتراکهای انتشار/ذخیره ابری را تنظیم کردید و یک سرویس پیامرسانی مقیاسپذیر و قابل اعتماد برای پخش رویدادهای CDC خود فراهم کردید. شما تغییرات شبیهسازیشده پایگاه داده خود را در انتشار/ذخیره منتشر کردید و یک جریان داده بلادرنگ ایجاد کردید.
- دادههای پردازششده با Dataproc و Spark: شما یک کلاستر Dataproc را آمادهسازی کردید و یک کار Spark Streaming را برای مصرف پیامهای اشتراک Pub/Sub خود مستقر کردید. رویدادهای CDC ورودی را بهصورت بلادرنگ پردازش و تبدیل کردید و نتایج را در گزارشهای کار Dataproc خود نمایش دادید.
- ساخت یک خط لوله داده بلادرنگ سرتاسری: شما با موفقیت این سرویسها را برای ایجاد یک خط لوله داده کامل که تغییرات دادهها را به صورت بلادرنگ ثبت، استریم و پردازش میکند، ادغام کردید. شما تجربه عملی در ساخت سیستمی که بتواند جریانهای داده پیوسته را مدیریت کند، کسب کردهاید.
- استفاده از رابط Pub/Sub اسپارک: شما با موفقیت یک کلاستر Dataproc را برای استفاده از رابط Pub/Sub اسپارک پیکربندی کردید، که برای Spark Structured Streaming جهت خواندن دادهها از Pub/Sub بسیار مهم است.
اکنون شما پایه محکمی برای ساخت خطوط داده پیچیدهتر و ماهرانهتر برای برنامههای مختلف، از جمله تجزیه و تحلیل بلادرنگ، انبار دادهها و معماریهای میکروسرویس دارید. به کاوش و ساخت ادامه دهید!