ثبت تغییر ساختمان با استفاده از Dataproc و Cloud Pub/Sub

۱. مقدمه

df8070bd84336207.png

آخرین به‌روزرسانی: 2025-06-19

ثبت داده‌های تغییر چیست؟

ثبت تغییرات داده‌ها (CDC) مجموعه‌ای از الگوهای طراحی نرم‌افزار است که برای تعیین و ردیابی داده‌هایی که در یک پایگاه داده تغییر کرده‌اند، استفاده می‌شود. به عبارت ساده‌تر، این روشی برای ثبت و ضبط تغییرات ایجاد شده در داده‌ها است تا بتوان این تغییرات را در سیستم‌های دیگر تکرار کرد.

ثبت داده‌های تغییر (CDC) در طیف گسترده‌ای از سناریوهای داده‌محور مانند مهاجرت داده‌ها، انبارداری و تجزیه و تحلیل داده‌های بلادرنگ، بازیابی اطلاعات و دسترسی‌پذیری بالا، حسابرسی و انطباق و غیره فوق‌العاده مفید است.

مهاجرت داده

CDC با فراهم کردن امکان انتقال تدریجی داده‌ها، کاهش زمان از کارافتادگی و به حداقل رساندن اختلال، پروژه‌های مهاجرت داده‌ها را ساده می‌کند.

انبارداری و تحلیل داده‌های بلادرنگ

CDC تضمین می‌کند که انبارهای داده و سیستم‌های تحلیلی دائماً با آخرین تغییرات پایگاه‌های داده عملیاتی به‌روز می‌شوند.

این امر به کسب‌وکارها اجازه می‌دهد تا بر اساس اطلاعات بلادرنگ تصمیم‌گیری کنند.

بازیابی پس از سانحه و دسترسی‌پذیری بالا

CDC امکان کپی کردن داده‌ها به صورت بلادرنگ (Real-time) را در پایگاه‌های داده ثانویه برای اهداف بازیابی پس از فاجعه فراهم می‌کند. در صورت بروز خرابی، CDC امکان انتقال سریع به پایگاه داده ثانویه را فراهم می‌کند و زمان از کار افتادگی و از دست رفتن داده‌ها را به حداقل می‌رساند.

حسابرسی و انطباق

CDC یک مسیر حسابرسی دقیق از تغییرات داده‌ها ارائه می‌دهد که برای انطباق با الزامات قانونی و نظارتی ضروری است.

آنچه خواهید ساخت

در این آزمایشگاه کد، شما قصد دارید با استفاده از Cloud Pub/Sub، Dataproc، Python و Apache Spark یک خط لوله داده تغییر-داده-ضبط (CDC) بسازید. خط لوله شما:

  • تغییرات پایگاه داده را شبیه‌سازی کنید و آنها را به عنوان رویدادها در Cloud Pub/Sub، یک سرویس پیام‌رسانی مقیاس‌پذیر و قابل اعتماد، منتشر کنید.
  • از قدرت Dataproc، سرویس مدیریت‌شده‌ی Spark و Hadoop گوگل کلود، برای پردازش این رویدادها به صورت بلادرنگ بهره ببرید.

با اتصال این سرویس‌ها، شما یک خط لوله قوی ایجاد خواهید کرد که قادر به ثبت و پردازش تغییرات داده‌ها در حین وقوع است و پایه و اساسی را برای تجزیه و تحلیل بلادرنگ، انبار داده‌ها و سایر برنامه‌های حیاتی فراهم می‌کند.

آنچه یاد خواهید گرفت

  • نحوه ایجاد یک خط لوله ثبت داده‌های تغییر اساسی
  • Dataproc برای پردازش جریانی
  • Cloud Pub/Sub برای پیام‌رسانی بلادرنگ
  • مبانی آپاچی اسپارک

این آزمایشگاه کد بر روی Dataproc و Cloud Pub/Sub تمرکز دارد. مفاهیم و بلوک‌های کد نامربوط نادیده گرفته شده‌اند و برای کپی و پیست ساده در اختیار شما قرار گرفته‌اند.

آنچه نیاز دارید

  • یک حساب کاربری GCP فعال با پروژه راه‌اندازی شده. اگر حساب کاربری ندارید، می‌توانید برای یک دوره آزمایشی رایگان ثبت‌نام کنید.
  • رابط خط فرمان gcloud نصب و پیکربندی شد.
  • پایتون ۳.۷+ برای شبیه‌سازی تغییرات پایگاه داده و تعامل با Pub/Sub نصب شده باشد.
  • آشنایی اولیه با Dataproc، Cloud Pub/Sub، Apache Spark و پایتون.

قبل از شروع

برای فعال کردن API های مورد نیاز، دستور زیر را در ترمینال اجرا کنید:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

۲. راه‌اندازی Cloud Pub/Sub

ایجاد یک موضوع

این تاپیک برای انتشار تغییرات پایگاه داده استفاده خواهد شد. وظیفه Dataproc مصرف‌کننده این پیام‌ها خواهد بود و پیام‌ها را برای ثبت داده‌های تغییر پردازش می‌کند. اگر می‌خواهید درباره تاپیک‌ها بیشتر بدانید، می‌توانید مستندات رسمی را اینجا بخوانید.

gcloud pubsub topics create database-changes

ایجاد اشتراک

یک اشتراک ایجاد کنید که برای استفاده از پیام‌ها در Pub/Sub استفاده خواهد شد. برای کسب اطلاعات بیشتر در مورد اشتراک‌ها، می‌توانید مستندات رسمی را اینجا بخوانید.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

۳. شبیه‌سازی تغییرات پایگاه داده

مراحل

  1. یک اسکریپت پایتون (مثلاً simulate_cdc.py ) ایجاد کنید تا تغییرات پایگاه داده را شبیه‌سازی کرده و آنها را در Pub/Sub منتشر کند.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

YOUR_PROJECT_ID را با شناسه پروژه GCP واقعی خود جایگزین کنید.

  1. کتابخانه کلاینت Pub/Sub را نصب کنید:
pip install google-cloud-pubsub
  1. اسکریپت را در ترمینال خود اجرا کنید. این اسکریپت به طور مداوم اجرا می‌شود و هر ۲ ثانیه یک بار پیام‌هایی را در تاپیک Pub/Sub منتشر می‌کند.
python simulate_cdc.py
  1. بعد از اجرای اسکریپت به مدت مثلاً ۱ دقیقه، به اندازه کافی پیام در Pub/Sub برای مصرف خواهید داشت. می‌توانید بسته به سیستم عامل خود، با فشار دادن کلیدهای ctrl + C یا Cmd + C، اجرای اسکریپت پایتون را خاتمه دهید.
  2. مشاهده پیام‌های منتشر شده:

یک ترمینال دیگر باز کنید و دستور زیر را برای مشاهده پیام‌های منتشر شده اجرا کنید:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

شما باید یک ردیف جدول حاوی پیام و فیلدهای دیگر را ببینید:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

توضیح

  • اسکریپت پایتون با تولید تصادفی رویدادهای INSERT ، UPDATE یا DELETE ، تغییرات پایگاه داده را شبیه‌سازی می‌کند.
  • هر تغییر به صورت یک شیء JSON نمایش داده می‌شود که شامل نوع تغییر، شناسه رکورد، مهر زمانی و داده‌ها است.
  • این اسکریپت از کتابخانه Cloud Pub/Sub client برای انتشار این رویدادهای تغییر در موضوع database-changes استفاده می‌کند.
  • دستور subscriber به شما امکان می‌دهد پیام‌هایی را که به موضوع pub/sub ارسال می‌شوند، مشاهده کنید.

۴. یک حساب کاربری سرویس برای Dataproc ایجاد کنید

در این بخش، شما یک حساب کاربری Service ایجاد می‌کنید که کلاستر Dataproc می‌تواند از آن استفاده کند. همچنین مجوزهای لازم را برای دسترسی نمونه‌های کلاستر به Cloud Pub/sub و Dataproc اختصاص می‌دهید.

  1. ایجاد حساب کاربری سرویس:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. نقش کارگر Dataproc را اضافه کنید تا به حساب سرویس اجازه ایجاد خوشه‌ها و اجرای کارها را بدهید. شناسه حساب سرویس تولید شده در دستور قبلی را به عنوان عضو در دستور زیر اضافه کنید:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. نقش Pub/sub subscriber را اضافه کنید تا به حساب سرویس اجازه دهید در اشتراک Pub/sub از نوع "change-subscriber" مشترک شود:
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

۵. ایجاد یک کلاستر Dataproc

کلاستر Dataproc برنامه‌ی spark را اجرا خواهد کرد که پیام‌ها را در Pub/sub پردازش خواهد کرد. شما به حساب کاربری سرویس ایجاد شده در بخش قبل نیاز دارید. Dataproc این حساب کاربری سرویس را به هر نمونه در کلاستر اختصاص می‌دهد تا تمام نمونه‌ها مجوزهای صحیح برای اجرای برنامه را دریافت کنند.

برای ایجاد یک کلاستر Dataproc از دستور زیر استفاده کنید:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

۶. ارسال کار Spark به خوشه Dataproc

برنامه‌ی استریمینگ اسپارک، پیام‌های تغییر پایگاه داده را در Pub/sub پردازش کرده و آنها را در کنسول چاپ می‌کند.

مراحل

  1. یک دایرکتوری ایجاد کنید و کد منبع مصرف‌کننده را به فایل PubsubConsumer.scala اضافه کنید.
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. فایل زیر را ایجاد و به pom.xml اضافه کنید.
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. به دایرکتوری spark پروژه بروید و مسیر را در یک متغیر محیطی ذخیره کنید تا بعداً از آن استفاده شود:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. تغییر دایرکتوری:
cd $REPO_ROOT/spark
  1. جاوا ۱.۸ را دانلود کنید و پوشه را در /usr/lib/jvm/ قرار دهید. سپس JAVA_HOME خود را به این تغییر دهید:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. ساخت فایل jar برنامه
mvn clean package

فایل آرشیو spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar که شامل کد برنامه و وابستگی‌های آن است، در دایرکتوری spark/target ایجاد می‌شود.

  1. درخواست اسپارک را ارسال کنید:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. لیست مشاغل فعال را نمایش داده و مقدار JOB_ID را برای شغل یادداشت کنید:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

خروجی شبیه به این خواهد بود:

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. با باز کردن URL زیر در مرورگر خود، خروجی کار را مشاهده کنید. [JOB_ID] را با مقداری که در مرحله قبل مشخص شده است، جایگزین کنید.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. خروجی مشابه زیر است:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

کار استریمینگ اسپارک که در Dataproc اجرا می‌شود، پیام‌ها را از Pub/sub دریافت می‌کند، آنها را پردازش می‌کند و خروجی را در کنسول نمایش می‌دهد.

  1. خاتمه دادن به کار: دستور زیر را برای خاتمه دادن به کار اجرا کنید. JOB_ID را با همان موردی که قبلاً یادداشت کردیم جایگزین کنید.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

تبریک! شما به تازگی یک خط لوله CDC قدرتمند ایجاد کرده‌اید که تغییرات پایگاه داده را در Pub/sub ثبت کرده و آنها را با استفاده از جریان جرقه‌ای که در Cloud Dataproc اجرا می‌شود، پردازش می‌کند.

۷. تمیز کردن

منابعی را که ایجاد کرده‌اید، پاک کنید تا در آینده بابت آنها هزینه‌ای از شما دریافت نشود. ساده‌ترین راه برای حذف هزینه، حذف پروژه‌ای است که برای آموزش ایجاد کرده‌اید. به عنوان یک روش جایگزین، می‌توانید منابع را به صورت جداگانه حذف کنید.

برای حذف منابع تکی، دستورات زیر را اجرا کنید

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

۸. تبریک

تبریک می‌گویم، شما به تازگی یک آزمایشگاه کد عملی را به پایان رسانده‌اید که نحوه ساخت یک خط لوله داده بلادرنگ قوی با استفاده از پلتفرم ابری گوگل را نشان می‌دهد. بیایید خلاصه‌ای از آنچه انجام داده‌اید را مرور کنیم:

  • ثبت داده‌های تغییر شبیه‌سازی‌شده (CDC): شما اصول اولیه CDC را یاد گرفتید و یک اسکریپت پایتون برای شبیه‌سازی تغییرات پایگاه داده پیاده‌سازی کردید و رویدادهایی را ایجاد کردید که نشان‌دهنده تغییرات داده‌های بلادرنگ هستند.
  • انتشار/ذخیره ابری قدرتمند: شما موضوعات و اشتراک‌های انتشار/ذخیره ابری را تنظیم کردید و یک سرویس پیام‌رسانی مقیاس‌پذیر و قابل اعتماد برای پخش رویدادهای CDC خود فراهم کردید. شما تغییرات شبیه‌سازی‌شده پایگاه داده خود را در انتشار/ذخیره منتشر کردید و یک جریان داده بلادرنگ ایجاد کردید.
  • داده‌های پردازش‌شده با Dataproc و Spark: شما یک کلاستر Dataproc را آماده‌سازی کردید و یک کار Spark Streaming را برای مصرف پیام‌های اشتراک Pub/Sub خود مستقر کردید. رویدادهای CDC ورودی را به‌صورت بلادرنگ پردازش و تبدیل کردید و نتایج را در گزارش‌های کار Dataproc خود نمایش دادید.
  • ساخت یک خط لوله داده بلادرنگ سرتاسری: شما با موفقیت این سرویس‌ها را برای ایجاد یک خط لوله داده کامل که تغییرات داده‌ها را به صورت بلادرنگ ثبت، استریم و پردازش می‌کند، ادغام کردید. شما تجربه عملی در ساخت سیستمی که بتواند جریان‌های داده پیوسته را مدیریت کند، کسب کرده‌اید.
  • استفاده از رابط Pub/Sub اسپارک: شما با موفقیت یک کلاستر Dataproc را برای استفاده از رابط Pub/Sub اسپارک پیکربندی کردید، که برای Spark Structured Streaming جهت خواندن داده‌ها از Pub/Sub بسیار مهم است.

اکنون شما پایه محکمی برای ساخت خطوط داده پیچیده‌تر و ماهرانه‌تر برای برنامه‌های مختلف، از جمله تجزیه و تحلیل بلادرنگ، انبار داده‌ها و معماری‌های میکروسرویس دارید. به کاوش و ساخت ادامه دهید!

اسناد مرجع