إنشاء ميزة "التقاط بيانات التغيير" باستخدام Dataproc وCloud Pub/Sub

1. مقدمة

df8070bd84336207.png

تاريخ آخر تعديل: 19-06-2025

ما هي ميزة "تسجيل البيانات المتغيرة"؟

تغيير التقاط البيانات (CDC) هو مجموعة من أنماط تصميم البرامج المستخدَمة لتحديد وتتبُّع البيانات التي تم تغييرها في قاعدة البيانات. ببساطة، هي طريقة لتسجيل التغييرات التي يتم إجراؤها على البيانات حتى يمكن تكرار هذه التغييرات في أنظمة أخرى.

تُعدّ ميزة "التقاط البيانات المتغيرة" (CDC) مفيدة للغاية في مجموعة كبيرة من السيناريوهات المستندة إلى البيانات، مثل نقل البيانات، وتخزين البيانات وتحليلها في الوقت الفعلي، واسترداد البيانات بعد حدوث كارثة، وتوفير إمكانية الوصول العالية، والتدقيق والامتثال، وما إلى ذلك.

نقل البيانات

تسهّل ميزة "التقاط البيانات المتغيرة" مشاريع نقل البيانات من خلال السماح بنقل البيانات بشكل تدريجي، ما يقلّل من وقت التوقف عن العمل ويحدّ من الانقطاع.

مستودعات البيانات والتحليلات في الوقت الفعلي

تضمن عملية "تغيير البيانات" تحديث مستودعات البيانات وأنظمة التحليل باستمرار بأحدث التغييرات من قواعد البيانات التشغيلية.

يتيح ذلك للأنشطة التجارية اتّخاذ قرارات استنادًا إلى معلومات في الوقت الفعلي.

الاستعادة من الكوارث وإمكانية الوصول العالية

تتيح ميزة "التقاط البيانات المتغيرة" نسخ البيانات في الوقت الفعلي إلى قواعد بيانات ثانوية لأغراض استرداد البيانات بعد وقوع كارثة. في حال حدوث عطل، تتيح ميزة "التقاط البيانات المتغيرة" إمكانية الانتقال السريع إلى قاعدة بيانات ثانوية، ما يقلّل من فترة التوقف عن العمل وفقدان البيانات.

التدقيق والامتثال

توفّر ميزة "التقاط البيانات المتغيرة" سجلّ تدقيق مفصّلاً لتغييرات البيانات، وهو أمر ضروري للامتثال للمتطلبات التنظيمية.

ما ستنشئه

في هذا الدرس التطبيقي حول الترميز، ستنشئ مسار بيانات لتسجيل البيانات المتغيرة (CDC) باستخدام Cloud Pub/Sub وDataproc وPython وApache Spark. سيتضمّن مسار التعلّم ما يلي:

  • محاكاة تغييرات قاعدة البيانات ونشرها كأحداث في Cloud Pub/Sub، وهي خدمة مراسلة قابلة للتوسّع وموثوقة
  • يمكنك الاستفادة من إمكانات Dataproc، وهي خدمة Spark وHadoop المُدارة من Google Cloud، لمعالجة هذه الأحداث في الوقت الفعلي.

من خلال ربط هذه الخدمات، ستنشئ مسارًا قويًا قادرًا على تسجيل تغييرات البيانات ومعالجتها فور حدوثها، ما يوفّر أساسًا للتحليلات في الوقت الفعلي وتخزين البيانات وغير ذلك من التطبيقات المهمة.

ما ستتعلمه

  • كيفية إنشاء مسار أساسي لتسجيل البيانات المتغيرة
  • Dataproc لمعالجة البث
  • ‫Cloud Pub/Sub للمراسلة في الوقت الفعلي
  • أساسيات Apache Spark

يركّز هذا الدرس التطبيقي حول الترميز على Dataproc وCloud Pub/Sub. يتم تجاهل المفاهيم ومجموعات الرموز غير ذات الصلة، ويتم توفيرها لك لنسخها ولصقها ببساطة.

المتطلبات

  • حساب نشط على Google Cloud Platform تم إعداد مشروع له في حال عدم توفُّر حساب، يمكنك الاشتراك للاستفادة من فترة تجريبية مجانية.
  • تم تثبيت gcloud CLI وإعداده.
  • يجب أن يكون الإصدار 3.7 من Python أو إصدار أحدث مثبّتًا لمحاكاة تغييرات قاعدة البيانات والتفاعل مع Pub/Sub.
  • معرفة أساسية بخدمات Dataproc وCloud Pub/Sub وApache Spark وPython

قبل البدء

نفِّذ الأمر التالي في الوحدة الطرفية لتفعيل واجهات برمجة التطبيقات المطلوبة:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. إعداد Cloud Pub/Sub

إنشاء موضوع

سيتم استخدام هذا الموضوع لنشر تغييرات قاعدة البيانات. ستكون مهمة Dataproc هي المستهلك لهذه الرسائل، وستعالج الرسائل من أجل تسجيل البيانات المتغيرة. إذا أردت معرفة المزيد عن المواضيع، يمكنك الاطّلاع على المستندات الرسمية هنا.

gcloud pubsub topics create database-changes

إنشاء اشتراك

أنشئ اشتراكًا سيتم استخدامه لاستهلاك الرسائل في Pub/Sub. لمعرفة المزيد عن الاشتراكات، يمكنك قراءة المستندات الرسمية هنا.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3- محاكاة تغييرات قاعدة البيانات

الخطوات

  1. أنشئ نصًا برمجيًا بلغة Python (مثل simulate_cdc.py) لمحاكاة تغييرات قاعدة البيانات ونشرها في Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

استبدِل YOUR_PROJECT_ID برقم تعريف مشروع Google Cloud Platform الفعلي.

  1. ثبِّت مكتبة برامج Pub/Sub:
pip install google-cloud-pubsub
  1. شغِّل النص البرمجي في نافذة الأوامر. سيتم تشغيل هذا النص البرمجي بشكل مستمر ونشر الرسائل كل ثانيتَين في موضوع Pub/Sub.
python simulate_cdc.py
  1. بعد تشغيل النص البرمجي لمدة دقيقة واحدة مثلاً، ستتوفّر لديك رسائل كافية في Pub/Sub لاستخدامها. يمكنك إنهاء برنامج Python النصي الذي يتم تشغيله بالضغط على ctrl + C أو Cmd + C، حسب نظام التشغيل.
  2. عرض الرسائل المنشورة:

افتح وحدة طرفية أخرى ونفِّذ الأمر التالي لعرض الرسائل المنشورة:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

من المفترض أن يظهر لك صف جدول يحتوي على الرسالة والحقول الأخرى:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

الشرح

  • يحاكي نص Python البرمجي تغييرات قاعدة البيانات من خلال إنشاء أحداث INSERT أو UPDATE أو DELETE بشكل عشوائي.
  • يتم تمثيل كل تغيير كعنصر JSON يحتوي على نوع التغيير ومعرّف السجلّ والطابع الزمني والبيانات.
  • يستخدم النص البرمجي مكتبة برامج Cloud Pub/Sub لنشر أحداث التغيير هذه في الموضوع database-changes.
  • يتيح لك أمر المشترك عرض الرسائل التي يتم إرسالها إلى موضوع النشر/الاشتراك.

4. إنشاء حساب خدمة لـ Dataproc

في هذا القسم، يمكنك إنشاء حساب خدمة يمكن لمجموعة Dataproc استخدامه. يمكنك أيضًا منح الأذونات اللازمة للسماح لمثيلات المجموعة بالوصول إلى Cloud Pub/Sub وDataproc.

  1. أنشئ حساب خدمة:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. أضِف دور عامل Dataproc للسماح لحساب الخدمة بإنشاء المجموعات وتنفيذ المهام. أضِف رقم تعريف حساب الخدمة الذي تم إنشاؤه في الأمر السابق كعضو في الأمر أدناه:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. أضِف دور المشترك في Pub/Sub للسماح لحساب الخدمة بالاشتراك في اشتراك Pub/Sub "change-subscriber":
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5- إنشاء مجموعة Dataproc

ستشغّل مجموعة Dataproc تطبيق Spark الذي سيعالج الرسائل في Pub/Sub. ستحتاج إلى حساب الخدمة الذي تم إنشاؤه في القسم السابق. يخصّص Dataproc حساب الخدمة هذا لكل آلة افتراضية في المجموعة لكي تحصل جميع الآلات الافتراضية على الأذونات الصحيحة لتشغيل التطبيق.

استخدِم الأمر التالي لإنشاء مجموعة Dataproc:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. إرسال مهمة Spark إلى مجموعة Dataproc

يعالج تطبيق البث المباشر في Spark رسائل تغيير قاعدة البيانات في Pub/Sub ويطبعها في وحدة التحكّم.

الخطوات

  1. إنشاء دليل وإضافة الرمز المصدري للمستهلك إلى ملف PubsubConsumer.scala
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. إنشاء ما يلي وإضافته إلى ملف pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. انتقِل إلى دليل Spark الخاص بالمشروع واحفظ المسار في متغيّر بيئة لاستخدامه لاحقًا:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. تغيير الدليل:
cd $REPO_ROOT/spark
  1. نزِّل Java 1.8 وضَع المجلد في /usr/lib/jvm/. بعد ذلك، غيِّر JAVA_HOME للإشارة إلى ما يلي:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. إنشاء ملف JAR للتطبيق
mvn clean package

يتم إنشاء أرشيف spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar الذي يحتوي على رمز التطبيق والتبعيات في الدليل spark/target

  1. أرسِل تطبيق Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. اعرض قائمة بالمهام النشطة ودوِّن قيمة JOB_ID للمهمة:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

ستبدو النتيجة مشابهةً لما يلي:

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. يمكنك الاطّلاع على ناتج المهمة من خلال فتح عنوان URL التالي في المتصفّح. استبدِل [JOB_ID] بالقيمة التي تم تدوينها في الخطوة السابقة.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. يكون الناتج مشابهًا لما يلي:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

تسحب مهمة البث المباشر في Spark التي يتم تشغيلها في Dataproc الرسائل من Pub/Sub وتعالجها وتعرض الناتج في وحدة التحكّم.

  1. إنهاء المهمة: شغِّل الأمر التالي لإنهاء المهمة. استبدِل JOB_ID بالمعرّف نفسه الذي ذكرناه سابقًا
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

تهانينا! لقد أنشأت للتو مسارًا قويًا لتسجيل البيانات المتغيرة (CDC) يعمل على تسجيل تغييرات قاعدة البيانات في Pub/Sub ومعالجتها باستخدام Spark Streaming الذي يعمل في Cloud Dataproc.

7. تَنظيم

نظِّف أي موارد أنشأتها حتى لا يتم تحصيل رسوم منك مقابلها في المستقبل. أسهل طريقة لإيقاف الفوترة هي حذف المشروع الذي أنشأته لتنفيذ البرنامج التعليمي. يمكنك بدلاً من ذلك حذف مراجع فردية.

نفِّذ الأوامر التالية لحذف موارد فردية

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. تهانينا

تهانينا، لقد أكملت للتو درسًا تطبيقيًا عمليًا يوضّح كيفية إنشاء مسار بيانات قوي في الوقت الفعلي باستخدام Google Cloud Platform. لنلخّص ما أنجزته:

  • محاكاة عملية التقاط البيانات المتغيرة (CDC): تعرّفت على أساسيات عملية التقاط البيانات المتغيرة ونفّذت نصًا برمجيًا بلغة Python لمحاكاة تغييرات قاعدة البيانات، ما أدّى إلى إنشاء أحداث تمثّل تعديلات البيانات في الوقت الفعلي.
  • استخدام Cloud Pub/Sub: يمكنك إعداد مواضيع واشتراكات في Cloud Pub/Sub، ما يوفّر خدمة رسائل قابلة للتوسّع وموثوقة لبث أحداث "التقاط بيانات التغيير". لقد نشرت تغييرات قاعدة البيانات المحاكية إلى Pub/Sub، ما أدّى إلى إنشاء مصدر بيانات في الوقت الفعلي.
  • معالجة البيانات باستخدام Dataproc وSpark: أنشأت مجموعة Dataproc ونشرت مهمة Spark Streaming لاستهلاك الرسائل من اشتراكك في Pub/Sub. لقد عالجت أحداث CDC الواردة وحوّلتها في الوقت الفعلي، وعرضت النتائج في سجلات مهام Dataproc.
  • إنشاء مسار متكامل في الوقت الفعلي: تم دمج هذه الخدمات بنجاح لإنشاء مسار بيانات كامل يرصد تغييرات البيانات ويبثّها ويعالجها في الوقت الفعلي. اكتسبت خبرة عملية في إنشاء نظام يمكنه التعامل مع مصادر البيانات المستمرة.
  • استخدام موصّل Spark Pub/Sub: تمكّنت من إعداد مجموعة Dataproc لاستخدام موصّل Spark Pub/Sub، وهو أمر بالغ الأهمية لكي تتمكّن ميزة "البث المنظَّم في Spark" من قراءة البيانات من Pub/Sub.

لديك الآن أساس قويّ لإنشاء مسارات بيانات أكثر تعقيدًا وتطوّرًا لمختلف التطبيقات، بما في ذلك التحليلات في الوقت الفعلي وتخزين البيانات وبنيات الخدمات المصغّرة. ننصحك بمواصلة الاستكشاف والبناء.

المستندات المرجعية