إنشاء ميزة "التقاط بيانات التغيير" باستخدام Dataproc وCloud Pub/Sub

إنشاء ميزة "التقاط بيانات التغيير" باستخدام Dataproc وCloud Pub/Sub

لمحة عن هذا الدرس التطبيقي حول الترميز

subjectتاريخ التعديل الأخير: يونيو 19, 2025
account_circleتأليف: Jatin Narula

1. مقدمة

df8070bd84336207.png

تاريخ آخر تعديل: 19‏/06‏/2025

ما هو "تتبُّع تغييرات البيانات"؟

ميزة "تجميع بيانات التغييرات" (CDC) هي مجموعة من أنماط تصميم البرامج المستخدَمة لتحديد البيانات التي تغيّرت في قاعدة بيانات وتتبُّعها. وبعبارة أبسط، هي طريقة لتسجيل التغييرات التي يتم إجراؤها على البيانات حتى يمكن تكرار هذه التغييرات في الأنظمة الأخرى.

إنّ ميزة "تسجيل بيانات التغيير" (CDC) مفيدة بشكلٍ كبير في مجموعة واسعة من السيناريوهات المستندة إلى البيانات، مثل نقل البيانات ومستودعات البيانات والإحصاءات في الوقت الفعلي واسترداد البيانات في حالات الكوارث وإمكانية الاستخدام العالية والتدقيق والامتثال وما إلى ذلك.

نقل البيانات

تعمل ميزة "استعادة البيانات في حالات الطوارئ" على تبسيط مشاريع نقل البيانات من خلال السماح بنقل البيانات بشكل تدريجي، ما يقلل من وقت التوقف عن العمل ويحدّ من الانقطاعات.

تحليلات ومستودعات البيانات في الوقت الفعلي

يضمن مركز CDC تحديث مستودعات البيانات والأنظمة التحليلية باستمرار بآخر التغييرات من قواعد البيانات التشغيلية.

ويسمح ذلك للأنشطة التجارية باتخاذ القرارات استنادًا إلى المعلومات في الوقت الفعلي.

الاستجابة للأزمات وإمكانية الوصول العالية

تتيح ميزة "استرداد البيانات بعد الكوارث" تكرار البيانات في الوقت الفعلي إلى قواعد بيانات ثانوية لأغراض استرداد البيانات بعد الكوارث. في حال حدوث عطل، تسمح ميزة "استرداد البيانات عند حدوث عطل" بنقل البيانات بسرعة إلى قاعدة بيانات ثانوية، ما يقلل من وقت الاستراحة وفقدان البيانات.

التدقيق والامتثال

توفّر مراكز السيطرة على الأمراض "مسار تدقيق" مفصّلاً لتغييرات البيانات، وهو أمر ضروري للامتثال للمتطلبات التنظيمية.

التطبيق الذي ستصممه

في هذا الدليل التعليمي حول الرموز البرمجية، ستُنشئ مسار بيانات لميزة "تسجيل التغييرات في البيانات" (CDC) باستخدام Cloud Pub/Sub وDataproc وPython وApache Spark. سيؤدي مسار الإحالة الناجحة إلى ما يلي:

  • يمكنك محاكاة تغييرات قاعدة البيانات ونشرها كأحداث في Cloud Pub/Sub، وهي خدمة مراسلة قابلة للتطوير وموثوقة.
  • يمكنك الاستفادة من إمكانات Dataproc، وهي خدمة Spark وHadoop المُدارة من Google Cloud، لمعالجة هذه الأحداث في الوقت الفعلي.

من خلال ربط هذه الخدمات، يمكنك إنشاء مسار عمل قوي قادر على تسجيل تغييرات البيانات ومعالجتها أثناء حدوثها، ما يوفر أساسًا للتحليلات في الوقت الفعلي ومستودعات البيانات والتطبيقات المهمة الأخرى.

ما ستتعرّف عليه

  • كيفية إنشاء مسار أساسي لجمع بيانات التغييرات
  • Dataproc لمعالجة البث
  • Cloud Pub/Sub للمراسلة في الوقت الفعلي
  • أساسيات Apache Spark

يركز هذا الدرس التطبيقي حول الترميز على Dataproc وCloud Pub/Sub. يتم تجاهل المفاهيم غير ذات الصلة ووحدات الرموز البرمجية، ويتم توفيرها لك لنسخها ولصقها بسهولة.

المتطلبات

  • حساب نشط على Google Cloud Platform تم إعداد مشروع فيه إذا لم يكن لديك حساب، يمكنك الاشتراك في فترة تجريبية مجانية.
  • تثبيت gcloud CLI وضبطه
  • تثبيت الإصدار 3.7 من بايثون أو الإصدارات الأحدث لمحاكاة تغييرات قاعدة البيانات والتفاعل مع Pub/Sub
  • معرفة أساسية بـ Dataproc وCloud Pub/Sub وApache Spark وPython

قبل البدء

نفِّذ الأمر التالي في الوحدة الطرفية لتفعيل واجهات برمجة التطبيقات المطلوبة:

gcloud services enable \
    dataproc
.googleapis.com \
    pubsub
.googleapis.com \

2. إعداد Cloud Pub/Sub

إنشاء موضوع

سيتم استخدام هذا الموضوع لنشر تغييرات قاعدة البيانات. ستكون وظيفة Dataproc هي المستخدِم لهذه الرسائل وستعالج الرسائل لتسجيل بيانات التغييرات. إذا أردت معرفة المزيد من المعلومات عن المواضيع، يمكنك الاطّلاع على المستندات الرسمية هنا.

gcloud pubsub topics create database-changes

إنشاء اشتراك

أنشئ اشتراكًا سيتم استخدامه لاستهلاك الرسائل في Pub/Sub. للاطّلاع على مزيد من المعلومات عن الاشتراكات، يمكنك قراءة المستندات الرسمية هنا.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. محاكاة تغييرات قاعدة البيانات

الخطوات

  1. أنشئ نصًا برمجيًا بلغة Python (مثل simulate_cdc.py) لمحاكاة تغييرات قاعدة البيانات ونشرها على Pub/Sub
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
   
data_str = json.dumps(data).encode("utf-8")
   
future = publisher.publish(topic_path, data=data_str)
   
print(f"Published message ID: {future.result()}")

def simulate_change():
   
change_types = ["INSERT", "UPDATE", "DELETE"]
   
change_type = random.choice(change_types)
   
record_id = random.randint(1, 100)
   
timestamp = time.time()
   
change_data = {
       
"change_type": change_type,
       
"record_id": record_id,
       
"timestamp": timestamp,
       
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
   
}
   
publish_message(change_data)

if __name__ == "__main__":
   
while True:
       
simulate_change()
       
time.sleep(2)  # Simulate changes every 2 seconds

استبدِل YOUR_PROJECT_ID بمعرّف مشروعك الفعلي على Google Cloud Platform.

  1. ثبِّت مكتبة برامج Pub/Sub باتّباع الخطوات التالية:
pip install google-cloud-pubsub
  1. شغِّل النص البرمجي في وحدة التحكّم الطرفية. سيتم تشغيل هذا النص البرمجي بشكل مستمر ونشر الرسائل كل ثانيتَين في موضوع Pub/Sub.
python simulate_cdc.py
  1. بعد تشغيل النص البرمجي لمدة دقيقة واحدة مثلاً، ستتوفّر لديك رسائل كافية في Pub/Sub للاستهلاك. يمكنك إنهاء نص python البرمجي الجاري عن طريق الضغط على Ctrl ‏+ C أو Cmd ‏+ C، حسب نظام التشغيل.
  2. عرض الرسائل المنشورة:

افتح وحدة طرفية أخرى ونفِّذ الأمر التالي لعرض الرسائل المنشورة:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

من المفترض أن يظهر لك صف جدول يحتوي على الرسالة والحقول الأخرى:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

الشرح

  • يحاكي نص Python البرمجي تغييرات قاعدة البيانات من خلال إنشاء أحداث INSERT أو UPDATE أو DELETE بشكل عشوائي.
  • يتم تمثيل كل تغيير كعنصر JSON يحتوي على نوع التغيير ومعرّف السجلّ والطابع الزمني والبيانات.
  • يستخدم النص البرمجي مكتبة عملاء Cloud Pub/Sub لنشر أحداث التغييرات هذه إلى موضوع database-changes.
  • يتيح لك الأمر subscriber عرض الرسائل التي يتم إرسالها إلى موضوع Pub/Sub.

4. إنشاء حساب خدمة لخدمة Dataproc

في هذا القسم، يمكنك إنشاء حساب خدمة يمكن أن تستخدمه مجموعة Dataproc. يمكنك أيضًا منح الأذونات اللازمة للسماح لمثيلَي المجموعة بالوصول إلى Cloud Pub/Sub وDataproc.

  1. لإنشاء حساب خدمة:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. أضِف دور عامل Dataproc للسماح لحساب الخدمة بإنشاء مجموعات وتشغيل المهام. أضِف رقم تعريف حساب الخدمة الذي تم إنشاؤه في الأمر السابق كعضو في الأمر أدناه:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. أضِف دور "مشترك في Pub/sub" للسماح لحساب الخدمة بالاشتراك في اشتراك Pub/sub "change-subscriber":
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change
-subscriber \
       
--role roles/pubsub.subscriber \
       
--member="serviceAccount:<your-service-account-with-domain"

5. إنشاء مجموعة Dataproc

ستشغّل مجموعة Dataproc تطبيق Spark الذي سيعالج الرسائل في Pub/Sub. ستحتاج إلى حساب الخدمة الذي تم إنشاؤه في القسم السابق. تحدِّد Dataproc حساب الخدمة هذا لكلّ نسخة في المجموعة كي تحصل جميع النُسخ على الأذونات الصحيحة لتشغيل التطبيق.

استخدِم الأمر التالي لإنشاء مجموعة Dataproc:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. إرسال مهمة Spark إلى مجموعة Dataproc

يعالج تطبيق Spark Streaming رسائل تغيير قاعدة البيانات في Pub/Sub ويطبعها في وحدة التحكّم.

الخطوات

  1. أنشئ دليلاً وأضِف رمز المصدر للمستهلك إلى ملف PubsubConsumer.scala.
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc
-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

 
def createContext(projectID: String, checkpointDirectory: String)
   
: StreamingContext = {

   
// [START stream_setup]
   
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
   
val ssc = new StreamingContext(sparkConf, Seconds(1))

   
// Set the checkpoint directory
   
val yarnTags = sparkConf.get("spark.yarn.tags")
   
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
   
ssc.checkpoint(checkpointDirectory + '/' + jobId)
   
   
// Create stream
   
val messagesStream: DStream[String] = PubsubUtils
     
.createStream(
       
ssc,
       
projectID,
       
None,
       
"change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
       
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
     
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
   
// [END stream_setup]

   
processStringDStream(messagesStream)
   
       
ssc
 
}

 
def processStringDStream(stringDStream: DStream[String]): Unit = {
   
stringDStream.foreachRDD { rdd =>
     
if (!rdd.isEmpty()) {
       
val listOfStrings: List[String] = rdd.collect().toList
       
listOfStrings.foreach(str => println(s"message received: $str"))
     
} else {
       
println("looking for message...")
     
}
   
}
 
}

 
def main(args: Array[String]): Unit = {
   
if (args.length != 2) {
     
System.err.println("arguments are not passed correctly!")
     
System.exit(1)
   
}

   
val Seq(projectID, checkpointDirectory) = args.toSeq

   
// Create Spark context
   
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
     
() => createContext(projectID, checkpointDirectory))

   
// Start streaming until we receive an explicit termination
   
ssc.start()
   
ssc.awaitTermination()
 
}

}
  1. أنشئ ما يلي وأضِفه إلى pom.xml:
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. انتقِل إلى دليل Spark للمشروع واحفظ المسار في متغيّر بيئة لاستخدامه لاحقًا:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. تغيير الدليل:
cd $REPO_ROOT/spark
  1. نزِّل Java 1.8 وضع المجلد في ‎ /usr/lib/jvm/. بعد ذلك، غيِّر JAVA_HOME للإشارة إلى ما يلي:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. إنشاء حزمة jar للتطبيق
mvn clean package

يتم إنشاء أرشيف spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar الذي يحتوي على رمز التطبيق والملفات المضمّنة في الدليل spark/target.

  1. أرسِل طلب Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. اعرض قائمة المهام النشطة وسجِّل قيمة JOB_ID للمهمة:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

ستبدو النتيجة على النحو التالي:

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. يمكنك الاطّلاع على نتيجة المهمة من خلال فتح عنوان URL التالي في المتصفّح. استبدِل [JOB_ID] بالقيمة التي تمّت الإشارة إليها في الخطوة السابقة.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. تكون النتيجة مشابهة لما يلي:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

تُسحب مهمة Spark Streaming التي تعمل في Dataproc الرسائل من Pub/sub وتعالجها وتعرض الإخراج في وحدة التحكّم.

  1. إنهاء المهمة: يمكنك تنفيذ الأمر التالي لإنهاء المهمة. استبدِل JOB_ID بالقيمة نفسها التي ذكرناها سابقًا.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

تهانينا! لقد أنشأت للتوّ مسارًا فعّالاً للحفاظ على اتّساق البيانات يسجّل تغييرات قاعدة البيانات في Pub/Sub ويعالجها باستخدام Spark Streaming الذي يعمل في Cloud Dataproc.

7. تَنظيم

يمكنك إزالة أيّ موارد أنشأتها لكي لا يتم تحصيل رسوم منها في المستقبل. إنّ أسهل طريقة لإيقاف الفوترة هي حذف المشروع الذي أنشأته للدليل التعليمي. بدلاً من ذلك، يمكنك حذف موارد فردية.

شغِّل الأوامر التالية لحذف موارد فردية.

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics
delete database-changes --quiet
gcloud pubsub subscriptions
delete change-subscriber --quiet
gcloud iam service
-accounts delete <your-service-account-with-domain> --quiet

8. تهانينا

نشكرك على إكمال دورة تدريبية عملية حول رموز البرامج توضّح كيفية إنشاء مسار بيانات قوي في الوقت الفعلي باستخدام Google Cloud Platform. لنلخص ما تم إنجازه:

  • التقاط البيانات المتغيّرة (CDC) المحاكي: لقد تعرّفت على أساسيات ميزة "التقاط البيانات المتغيّرة" ونفّذت نصًا برمجيًا بلغة Python لمحاكاة تغييرات قاعدة البيانات، ما يؤدي إلى إنشاء أحداث تمثّل تعديلات البيانات في الوقت الفعلي.
  • الاستفادة من Cloud Pub/Sub: يمكنك إعداد مواضيع واشتراكات Cloud Pub/Sub، ما يوفر خدمة رسائل قابلة للتطوير وموثوقة لبث أحداث "الاستنساخ من جهة العميل". لقد نشرت تغييرات قاعدة البيانات المحاكية في Pub/Sub، ما أدى إلى إنشاء مصدر بيانات في الوقت الفعلي.
  • البيانات التي تمت معالجتها باستخدام Dataproc وSpark: لقد وفّرت مجموعة Dataproc ونشرت مهمة Spark Streaming لاستخدام الرسائل من اشتراكك في Pub/Sub. تمت معالجة أحداث CDC الواردة وتحويلها في الوقت الفعلي، مع عرض النتائج في سجلّات مهام Dataproc.
  • إنشاء سلسلة إجراءات متكاملة في الوقت الفعلي: لقد دمجت هذه الخدمات بنجاح لإنشاء سلسلة إجراءات كاملة للبيانات تعمل على تسجيل تغييرات البيانات وبثها ومعالجتها في الوقت الفعلي. اكتسبت خبرة عملية في إنشاء نظام يمكنه التعامل مع مصادر البيانات المستمرة.
  • استخدام موصِّل Spark Pub/Sub: تمكّنت من ضبط إعدادات مجموعة Dataproc لاستخدام موصِّل Spark Pub/Sub، وهو أمر مهم لكي تتمكّن Spark Structured Streaming من قراءة البيانات من Pub/Sub.

لديك الآن أساسًا متينًا لإنشاء مسارات بيانات أكثر تعقيدًا وتطورًا لتطبيقات مختلفة، بما في ذلك الإحصاءات في الوقت الفعلي ومستودعات البيانات وتصميمات الخدمات الصغيرة. ننصحك بمواصلة الاستكشاف وإنشاء المحتوى.

المستندات المرجعية