ثبت تغییر ساختمان با استفاده از Dataproc و Cloud Pub/Sub

ثبت تغییر ساختمان با استفاده از Dataproc و Cloud Pub/Sub

درباره این codelab

subjectآخرین به‌روزرسانی: ژوئن ۱۹, ۲۰۲۵
account_circleنویسنده: Jatin Narula

1. مقدمه

df8070bd84336207.png

آخرین به روز رسانی: 2025-06-19

Change Data Capture چیست؟

Change Data Capture (CDC) مجموعه ای از الگوهای طراحی نرم افزار است که برای تعیین و ردیابی داده هایی که در یک پایگاه داده تغییر کرده اند استفاده می شود. به عبارت ساده تر، این روشی برای گرفتن و ثبت تغییرات ایجاد شده در داده ها است تا بتوان آن تغییرات را در سیستم های دیگر تکرار کرد.

Change Data Capture (CDC) در طیف وسیعی از سناریوهای مبتنی بر داده مانند مهاجرت داده، انبارداری و تجزیه و تحلیل داده‌ها در زمان واقعی، بازیابی فاجعه و دسترسی بالا، حسابرسی و انطباق و غیره بسیار مفید است.

مهاجرت داده ها

CDC پروژه های انتقال داده را با امکان انتقال تدریجی داده، کاهش زمان خرابی و به حداقل رساندن اختلال، ساده می کند.

انبارداری و تحلیل داده در زمان واقعی

CDC تضمین می کند که انبارهای داده و سیستم های تحلیلی به طور مداوم با آخرین تغییرات پایگاه داده های عملیاتی به روز می شوند.

این به کسب و کارها اجازه می دهد تا بر اساس اطلاعات لحظه ای تصمیم بگیرند.

بازیابی فاجعه و در دسترس بودن بالا

CDC امکان تکثیر بلادرنگ داده ها را در پایگاه های داده ثانویه برای اهداف بازیابی فاجعه فراهم می کند. در صورت خرابی، CDC امکان خرابی سریع به یک پایگاه داده ثانویه را فراهم می کند و زمان خرابی و از دست دادن داده را به حداقل می رساند.

حسابرسی و انطباق

CDC یک دنباله حسابرسی دقیق از تغییرات داده ها را ارائه می دهد که برای انطباق و الزامات قانونی ضروری است.

چیزی که خواهی ساخت

در این نرم افزار کد، شما می خواهید با استفاده از Cloud Pub/Sub، Dataproc، Python و Apache Spark یک خط لوله داده تغییر-گرفتن داده (CDC) بسازید. خط لوله شما:

  • تغییرات پایگاه داده را شبیه سازی کنید و آنها را به عنوان رویداد در Cloud Pub/Sub، یک سرویس پیام رسانی مقیاس پذیر و قابل اعتماد منتشر کنید.
  • از قدرت Dataproc، سرویس Spark و Hadoop مدیریت شده Google Cloud، برای پردازش این رویدادها در زمان واقعی استفاده کنید.

با اتصال این سرویس‌ها، یک خط لوله قوی ایجاد می‌کنید که می‌تواند تغییرات داده‌ها را در زمان وقوع آنها ضبط و پردازش کند و پایه‌ای برای تجزیه و تحلیل بلادرنگ، انبار داده‌ها و سایر برنامه‌های کاربردی حیاتی فراهم کند.

چیزی که یاد خواهید گرفت

  • نحوه ایجاد یک خط لوله جمع آوری داده های تغییرات اساسی
  • Dataproc برای پردازش جریان
  • Cloud Pub/Sub برای پیام رسانی بلادرنگ
  • اصول اولیه آپاچی اسپارک

این کد لبه روی Dataproc و Cloud Pub/Sub متمرکز شده است. مفاهیم غیر مرتبط و بلوک‌های کد محو شده‌اند و برای شما ارائه می‌شوند تا به سادگی کپی و جای‌گذاری کنید.

آنچه شما نیاز دارید

  • یک حساب GCP فعال با راه اندازی پروژه. اگر ندارید، می توانید برای یک دوره آزمایشی رایگان ثبت نام کنید.
  • gcloud CLI نصب و پیکربندی شد.
  • Python 3.7+ برای شبیه سازی تغییرات پایگاه داده و تعامل با Pub/Sub نصب شده است.
  • دانش پایه Dataproc، Cloud Pub/Sub، Apache Spark و Python.

قبل از شروع

برای فعال کردن API های مورد نیاز، دستور زیر را در ترمینال اجرا کنید:

gcloud services enable \
    dataproc
.googleapis.com \
    pubsub
.googleapis.com \

2. Cloud Pub/Sub را راه اندازی کنید

یک موضوع ایجاد کنید

از این موضوع برای انتشار تغییرات پایگاه داده استفاده خواهد شد. Dataproc job مصرف کننده این پیام ها خواهد بود و پیام ها را برای تغییر گرفتن داده ها پردازش می کند. اگر می‌خواهید درباره موضوعات بیشتر بدانید، می‌توانید اسناد رسمی را اینجا بخوانید.

gcloud pubsub topics create database-changes

یک اشتراک ایجاد کنید

اشتراکی ایجاد کنید که برای مصرف پیام ها در Pub/Sub استفاده می شود. برای اطلاعات بیشتر در مورد اشتراک‌ها، می‌توانید اسناد رسمی را اینجا بخوانید.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. شبیه سازی تغییرات پایگاه داده

مراحل

  1. یک اسکریپت پایتون (به عنوان مثال simulate_cdc.py ) برای شبیه سازی تغییرات پایگاه داده و انتشار آنها در Pub/Sub ایجاد کنید.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
   
data_str = json.dumps(data).encode("utf-8")
   
future = publisher.publish(topic_path, data=data_str)
   
print(f"Published message ID: {future.result()}")

def simulate_change():
   
change_types = ["INSERT", "UPDATE", "DELETE"]
   
change_type = random.choice(change_types)
   
record_id = random.randint(1, 100)
   
timestamp = time.time()
   
change_data = {
       
"change_type": change_type,
       
"record_id": record_id,
       
"timestamp": timestamp,
       
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
   
}
   
publish_message(change_data)

if __name__ == "__main__":
   
while True:
       
simulate_change()
       
time.sleep(2)  # Simulate changes every 2 seconds

شناسه پروژه GCP واقعی خود را جایگزین YOUR_PROJECT_ID کنید

  1. کتابخانه Pub/Sub Client را نصب کنید:
pip install google-cloud-pubsub
  1. اسکریپت را روی ترمینال خود اجرا کنید. این اسکریپت به طور مداوم اجرا می شود و هر 2 ثانیه یک بار پیام هایی را به موضوع Pub/Sub منتشر می کند.
python simulate_cdc.py
  1. پس از اجرای اسکریپت برای فرض کنید 1 دقیقه، پیام های کافی در Pub/Sub برای مصرف خواهید داشت. بسته به سیستم عامل خود می توانید اسکریپت پایتون در حال اجرا را با فشار دادن ctrl + C یا Cmd + C خاتمه دهید.
  2. مشاهده پیام های منتشر شده:

ترمینال دیگری را باز کنید و دستور زیر را برای مشاهده پیام های منتشر شده اجرا کنید:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

شما باید یک ردیف جدول حاوی پیام و سایر فیلدها را ببینید:

{"change_type": "UPDATE"، "record_id": 10، "timestamp": 1742466264.888465، "data": {"field1": "value1"، "field2": "value2"}}

توضیح

  • اسکریپت پایتون با ایجاد تصادفی رویدادهای INSERT ، UPDATE یا DELETE ، تغییرات پایگاه داده را شبیه سازی می کند.
  • هر تغییر به عنوان یک شی JSON نشان داده می شود که شامل نوع تغییر، شناسه رکورد، مهر زمانی و داده است.
  • این اسکریپت از کتابخانه سرویس گیرنده Cloud Pub/Sub برای انتشار این رویدادهای تغییر در مبحث database-changes استفاده می کند.
  • دستور subscriber به شما امکان می دهد پیام هایی را که به موضوع pub/sub ارسال می شوند را مشاهده کنید.

4. یک حساب سرویس برای Dataproc ایجاد کنید

در این بخش، یک حساب سرویس ایجاد می کنید که خوشه Dataproc می تواند از آن استفاده کند. شما همچنین مجوزهای لازم را برای اجازه به نمونه های کلاستر برای دسترسی به Cloud Pub/sub و Dataproc اختصاص می دهید.

  1. ایجاد یک حساب خدمات:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. نقش کارمند Dataproc را اضافه کنید تا به حساب سرویس اجازه دهید خوشه‌ها ایجاد کند و کارها را اجرا کند. شناسه حساب سرویس تولید شده در دستور قبلی را به عنوان عضو در دستور زیر اضافه کنید:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. نقش مشترک Pub/sub را اضافه کنید تا به حساب سرویس اجازه دهید در اشتراک Pub/sub "تغییر-مشترک" مشترک شود:
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change
-subscriber \
       
--role roles/pubsub.subscriber \
       
--member="serviceAccount:<your-service-account-with-domain"

5. یک خوشه Dataproc ایجاد کنید

خوشه Dataproc برنامه spark را اجرا می کند که پیام ها را در Pub/sub پردازش می کند. شما به حساب سرویس ایجاد شده در بخش قبلی نیاز دارید. Dataproc این حساب سرویس را به هر نمونه در خوشه اختصاص می دهد تا همه نمونه ها مجوزهای صحیح را برای اجرای برنامه دریافت کنند.

برای ایجاد یک کلاستر Dataproc از دستور زیر استفاده کنید:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Spark Job را به Dataproc Cluster ارسال کنید

برنامه استریم جرقه پیام‌های تغییر پایگاه داده را در Pub/sub پردازش می‌کند و آنها را در کنسول چاپ می‌کند.

مراحل

  1. یک دایرکتوری ایجاد کنید و کد منبع مصرف کننده را به فایل PubsubConsumer.scala اضافه کنید
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc
-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

 
def createContext(projectID: String, checkpointDirectory: String)
   
: StreamingContext = {

   
// [START stream_setup]
   
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
   
val ssc = new StreamingContext(sparkConf, Seconds(1))

   
// Set the checkpoint directory
   
val yarnTags = sparkConf.get("spark.yarn.tags")
   
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
   
ssc.checkpoint(checkpointDirectory + '/' + jobId)
   
   
// Create stream
   
val messagesStream: DStream[String] = PubsubUtils
     
.createStream(
       
ssc,
       
projectID,
       
None,
       
"change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
       
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
     
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
   
// [END stream_setup]

   
processStringDStream(messagesStream)
   
       
ssc
 
}

 
def processStringDStream(stringDStream: DStream[String]): Unit = {
   
stringDStream.foreachRDD { rdd =>
     
if (!rdd.isEmpty()) {
       
val listOfStrings: List[String] = rdd.collect().toList
       
listOfStrings.foreach(str => println(s"message received: $str"))
     
} else {
       
println("looking for message...")
     
}
   
}
 
}

 
def main(args: Array[String]): Unit = {
   
if (args.length != 2) {
     
System.err.println("arguments are not passed correctly!")
     
System.exit(1)
   
}

   
val Seq(projectID, checkpointDirectory) = args.toSeq

   
// Create Spark context
   
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
     
() => createContext(projectID, checkpointDirectory))

   
// Start streaming until we receive an explicit termination
   
ssc.start()
   
ssc.awaitTermination()
 
}

}
  1. موارد زیر را ایجاد کرده و به pom.xml اضافه کنید
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. به دایرکتوری spark پروژه تغییر دهید و مسیر را در یک متغیر محیطی ذخیره کنید تا بعدا استفاده شود:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. دایرکتوری را تغییر دهید:
cd $REPO_ROOT/spark
  1. جاوا 1.8 را دانلود کنید و پوشه را در /usr/lib/jvm/ قرار دهید. سپس JAVA_HOME خود را برای اشاره به این تغییر دهید:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. jar برنامه را بسازید
mvn clean package

بایگانی spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar حاوی کد برنامه و وابستگی ها در دایرکتوری spark/target ایجاد شده است.

  1. درخواست اسپارک را ارسال کنید:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. لیست مشاغل فعال را نمایش دهید و مقدار JOB_ID را برای آن شغل یادداشت کنید:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

خروجی شبیه به

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. با باز کردن URL زیر در مرورگر خود، خروجی کار را مشاهده کنید. [JOB_ID] را با مقدار ذکر شده در مرحله قبل جایگزین کنید.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. خروجی مشابه موارد زیر است:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

کار استریم جرقه ای که در Dataproc اجرا می شود، پیام ها را از Pub/sub می کشد، آنها را پردازش می کند و خروجی را به کنسول نمایش می دهد.

  1. خاتمه کار: دستور زیر را برای خاتمه کار اجرا کنید. جایگزینی JOB_ID با همان موردی است که قبلاً اشاره کردیم
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

تبریک میگم شما به تازگی یک خط لوله CDC قدرتمند ایجاد کرده اید که تغییرات پایگاه داده را در Pub/sub ضبط می کند و آنها را با استفاده از جریان جرقه در حال اجرا در Cloud Dataproc پردازش می کند.

7. پاک کن

منابعی را که ایجاد کرده اید پاک کنید تا در آینده برای آن ها صورتحساب دریافت نکنید. ساده ترین راه برای حذف صورتحساب، حذف پروژه ای است که برای آموزش ایجاد کرده اید. از طرف دیگر، می توانید منابع فردی را حذف کنید.

دستورات زیر را برای حذف منابع فردی اجرا کنید

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics
delete database-changes --quiet
gcloud pubsub subscriptions
delete change-subscriber --quiet
gcloud iam service
-accounts delete <your-service-account-with-domain> --quiet

8. تبریک میگم

تبریک می‌گوییم، شما به‌تازگی یک کد آزمایشگاهی عملی را تکمیل کرده‌اید که نشان می‌دهد چگونه می‌توان یک خط لوله داده در زمان واقعی با استفاده از Google Cloud Platform ایجاد کرد. بیایید آنچه را که به دست آورده‌اید خلاصه کنیم:

  • شبیه‌سازی تغییر داده‌ها (CDC): شما اصول CDC را یاد گرفتید و یک اسکریپت پایتون را برای شبیه‌سازی تغییرات پایگاه‌داده پیاده‌سازی کردید، و رویدادهایی را ایجاد کرد که نشان‌دهنده تغییرات داده‌ها در زمان واقعی است.
  • Leveraged Cloud Pub/Sub: موضوعات و اشتراک‌های Cloud Pub/Sub را تنظیم می‌کنید و یک سرویس پیام‌رسانی مقیاس‌پذیر و قابل اعتماد برای پخش رویدادهای CDC خود ارائه می‌کنید. شما تغییرات پایگاه داده شبیه سازی شده خود را به Pub/Sub منتشر کردید و یک جریان داده در زمان واقعی ایجاد کردید.
  • داده های پردازش شده با Dataproc و Spark: شما یک خوشه Dataproc تهیه کردید و یک کار جریانی Spark را برای مصرف پیام های اشتراک Pub/Sub خود به کار بردید. شما رویدادهای CDC ورودی را در زمان واقعی پردازش و تبدیل کردید، و نتایج را در گزارش‌های کار Dataproc خود نمایش می‌دهید.
  • ساخت یک خط لوله زمان واقعی از انتها به انتها: شما با موفقیت این خدمات را برای ایجاد یک خط لوله داده کامل که تغییرات داده را در زمان واقعی ضبط، جریان و پردازش می کند، ادغام کردید. شما تجربه عملی در ساختن سیستمی به دست آوردید که بتواند جریان های داده پیوسته را مدیریت کند.
  • از اتصال دهنده Spark Pub/Sub استفاده کردید: شما با موفقیت یک خوشه Dataproc را برای استفاده از اتصال Spark Pub/Sub پیکربندی کردید، که برای Spark Structured Streaming برای خواندن داده ها از Pub/Sub بسیار مهم است.

اکنون شما یک پایه محکم برای ایجاد خطوط لوله داده پیچیده تر و پیچیده تر برای برنامه های مختلف، از جمله تجزیه و تحلیل بلادرنگ، انبار داده ها، و معماری های میکروسرویس دارید. به کاوش و ساختن ادامه دهید!

اسناد مرجع