מידע על Codelab זה
1. מבוא
תאריך עדכון אחרון: 19 ביוני 2025
מהו סימון נתונים שהשתנו (CDC)?
תיעוד שינויים בנתונים (CDC) הוא קבוצה של דפוסי תכנון תוכנה המשמשים לקביעת נתונים שהשתנו במסד נתונים ולמעקב אחריהם. במילים פשוטות יותר, זוהי דרך לתעד שינויים שבוצעו בנתונים כדי שניתן יהיה לשכפל את השינויים האלה במערכות אחרות.
תיעוד שינויים בנתונים (CDC) הוא שימושי מאוד במגוון רחב של תרחישים מבוססי-נתונים, כמו העברת נתונים, ניתוח נתונים ואחסון נתונים בזמן אמת, תוכנית התאוששות מאסון וזמינות גבוהה, ביקורת ותאימות וכו'.
העברת נתונים
בעזרת CDC אפשר להעביר נתונים באופן מצטבר, וכך לפשט פרויקטים של העברת נתונים, לצמצם את זמן ההשבתה ולמזער את השיבושים.
ניתוח ויצירת מחסני נתונים בזמן אמת
בעזרת CDC, מחסני הנתונים ומערכות הניתוח מתעדכנים כל הזמן בשינויים האחרונים ממסדי הנתונים התפעוליים.
כך העסקים יכולים לקבל החלטות על סמך מידע בזמן אמת.
תוכנית התאוששות מאסון (DR) וזמינות גבוהה
בעזרת CDC אפשר ליצור רפליקה של נתונים בזמן אמת במסדי נתונים משניים לצורכי תוכנית התאוששות מאסון (DR). במקרה של כשל, CDC מאפשר מעבר מהיר למסד נתונים משני, וכך מפחית את זמן ההשבתה ואובדן הנתונים.
ביקורת ותאימות
CDC מספק נתיב ביקורת מפורט של שינויים בנתונים, שהוא חיוני לדרישות התאימות והרגולציה.
מה תפַתחו
בקודלאב הזה תלמדו ליצור צינור עיבוד נתונים לסימון נתונים שהשתנו (CDC) באמצעות Cloud Pub/Sub, Dataproc, Python ו-Apache Spark. צינור עיבוד הנתונים יבצע את הפעולות הבאות:
- סימולציה של שינויים במסדי נתונים ופרסום שלהם כאירועים ב-Cloud Pub/Sub, שירות העברת הודעות אמין וניתן להתאמה.
- אתם יכולים להיעזר ב-Dataproc, השירות המנוהל של Google Cloud ל-Spark ול-Hadoop, כדי לעבד את האירועים האלה בזמן אמת.
חיבור השירותים האלה יאפשר לכם ליצור צינור עיבוד נתונים חזק שיכול לתעד ולעבד שינויים בנתונים בזמן אמת, וכך לספק בסיס לניתוח בזמן אמת, לאחסון נתונים ולאפליקציות קריטיות אחרות.
מה תלמדו
- איך יוצרים צינור עיבוד נתונים בסיסי לתיעוד שינויים בנתונים
- Dataproc לעיבוד נתונים בסטרימינג
- Cloud Pub/Sub להעברת הודעות בזמן אמת
- העקרונות הבסיסיים של Apache Spark
סדנת הקוד הזו מתמקדת ב-Dataproc וב-Cloud Pub/Sub. מושגים וקטעי קוד לא רלוונטיים מוצגים בקצרה, וניתן פשוט להעתיק ולהדביק אותם.
מה צריך להכין
- חשבון GCP פעיל עם פרויקט מוגדר. אם אין לכם מינוי, תוכלו להירשם לתקופת ניסיון בחינם.
- CLI של gcloud מותקן ומוגדר.
- Python 3.7 ואילך מותקן כדי לדמות שינויים במסדי נתונים ולקיים אינטראקציה עם Pub/Sub.
- ידע בסיסי ב-Dataproc, ב-Cloud Pub/Sub, ב-Apache Spark וב-Python.
מה צריך לדעת לפני שמתחילים
מריצים את הפקודה הבאה בטרמינל כדי להפעיל את ממשקי ה-API הנדרשים:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
2. הגדרת Cloud Pub/Sub
יצירת נושא
הנושא הזה ישמש לפרסום השינויים במסד הנתונים. המשימה ב-Dataproc תהיה הצרכן של ההודעות האלה, והיא תעבד את ההודעות לצורך תיעוד של נתוני השינויים. מידע נוסף על Topics זמין כאן.
gcloud pubsub topics create database-changes
יצירת מינוי
יוצרים מינוי שישמש לצריכת ההודעות ב-Pub/Sub. מידע נוסף על מינויים זמין כאן.
gcloud pubsub subscriptions create --topic database-changes change-subscriber
3. סימולציה של שינויים במסד הנתונים
שלבים
- יוצרים סקריפט Python (למשל,
simulate_cdc.py
) כדי לדמות שינויים במסדי נתונים ולפרסם אותם ב-Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
מחליפים את YOUR_PROJECT_ID במזהה הפרויקט בפועל ב-GCP
- מתקינים את ספריית הלקוח של Pub/Sub:
pip install google-cloud-pubsub
- מריצים את הסקריפט בטרמינל. התסריט הזה יפעל באופן רציף ויפרסם הודעות כל 2 שניות בנושא Pub/Sub.
python simulate_cdc.py
- אחרי שתפעילו את הסקריפט למשך דקה, יהיה לכם מספיק הודעות ב-Pub/Sub לשימוש. כדי לסיים את הסקריפט של Python שפועל, מקישים על Ctrl + C או על Cmd + C, בהתאם למערכת ההפעלה.
- הצגת ההודעות שפורסמו:
פותחים טרמינל אחר ומריצים את הפקודה הבאה כדי להציג את ההודעות שפורסמו:
gcloud pubsub subscriptions pull --auto-ack change-subscriber
אמורה להופיע שורה בטבלה שמכילה את ההודעה ושדות אחרים:
{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}
הסבר
- סקריפט Python מדמה שינויים במסד נתונים על ידי יצירת אירועים מסוג
INSERT
,UPDATE
אוDELETE
באופן אקראי. - כל שינוי מיוצג כאובייקט JSON שמכיל את סוג השינוי, מזהה הרשומה, חותמת הזמן והנתונים.
- הסקריפט משתמש בספריית הלקוח של Cloud Pub/Sub כדי לפרסם את אירועי השינוי האלה בנושא
database-changes
. - הפקודה subscriber מאפשרת לכם לראות את ההודעות שנשלחות לנושא ה-Pub/Sub.
4. יצירת חשבון שירות ל-Dataproc
בקטע הזה יוצרים חשבון שירות שאפשר להשתמש בו באשכול Dataproc. בנוסף, מקצים את ההרשאות הנדרשות כדי לאפשר למכונות באשכול לגשת ל-Cloud Pub/Sub ול-Dataproc.
- יוצרים חשבון שירות:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- מוסיפים את התפקיד 'עובד' ב-Dataproc כדי לאפשר לחשבון השירות ליצור אשכולות ולהריץ משימות. מוסיפים את מזהה חשבון השירות שנוצר בפקודה הקודמת כחבר בפקודה הבאה:
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- מוסיפים את התפקיד 'מנוי Pub/Sub' כדי לאפשר לחשבון השירות להירשם למינוי Pub/Sub בשם 'change-subscriber':
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
5. יצירת אשכול Dataproc
האפליקציה של Spark תפעל באשכולות Dataproc ותטפל בהודעות ב-Pub/Sub. תצטרכו את חשבון השירות שיצרתם בקטע הקודם. מערכת Dataproc מקצה את חשבון השירות הזה לכל מכונה באשכול, כדי שכל המכונות יקבלו את ההרשאות הנכונות להרצת האפליקציה.
כדי ליצור אשכול Dataproc, משתמשים בפקודה הבאה:
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
6. שליחת משימה של Spark לאשכול Dataproc
אפליקציית ה-Spark Streaming מעבדת את הודעות השינוי של מסדי הנתונים ב-Pub/Sub ומדפיסה אותן במסוף.
שלבים
- יוצרים ספרייה ומוסיפים את קוד המקור של הצרכן לקובץ PubsubConsumer.scala
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- יוצרים את הקובץ הבא ומוסיפים אותו לקובץ pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- עוברים לספריית Spark של הפרויקט ושומרים את הנתיב במשתנה סביבה לשימוש מאוחר יותר:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- משנים את הספרייה:
cd $REPO_ROOT/spark
- מורידים את Java 1.8 וממקמים את התיקייה ב-/usr/lib/jvm/. לאחר מכן משנים את JAVA_HOME כך שיצביע לכתובת הזו:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- פיתוח קובץ ה-jar של האפליקציה
mvn clean package
הארכיון spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar שמכיל את קוד האפליקציה ואת יחסי התלות נוצר בספרייה spark/target
- שולחים את בקשת ה-Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- מציגים את רשימת המשימות הפעילות ומתעדים את הערך של
JOB_ID
למשימה:
gcloud dataproc jobs list --region=us-central1 --state-filter=active
הפלט אמור להיראות כך:
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- כדי להציג את הפלט של המשימה, פותחים את כתובת ה-URL הבאה בדפדפן. מחליפים את [JOB_ID] בערך שצוין בשלב הקודם.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- הפלט אמור להיראות כך:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
המשימה של Spark Streaming שפועלת ב-Dataproc שולפת הודעות מ-Pub/Sub, מעבדת אותן ומציגה את הפלט במסוף.
- סיום המשימה: מריצים את הפקודה הבאה כדי לסיים את המשימה. מחליפים את JOB_ID באותו מזהה שציינו קודם
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
מזל טוב! יצרתם צינור עיבוד נתונים חזק ל-CDC, שמתעד את השינויים במסד הנתונים ב-Pub/Sub ומעבד אותם באמצעות Spark Streaming שפועל ב-Cloud Dataproc.
7. הסרת המשאבים
חשוב להסיר את המשאבים שיצרתם כדי שלא תחויבו עליהם בעתיד. הדרך הקלה ביותר לבטל את החיוב היא למחוק את הפרויקט שיצרתם בשביל המדריך. לחלופין, אפשר למחוק משאבים ספציפיים.
כדי למחוק משאבים ספציפיים, מריצים את הפקודות הבאות:
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
8. מזל טוב
מזל טוב, סיימת את Codelab המעשי הזה, שבו תלמד איך ליצור צינור עיבוד נתונים חזק בזמן אמת באמצעות Google Cloud Platform. זהו סיכום של מה שהשלמתם:
- סימולציה של סימון נתונים שהשתנו (CDC): למדתם את העקרונות הבסיסיים של CDC והטמעתם סקריפט Python כדי לדמות שינויים במסד הנתונים, וליצור אירועים שמייצגים שינויים בנתונים בזמן אמת.
- שימוש ב-Cloud Pub/Sub: אתם מגדירים נושאים ומינויים ב-Cloud Pub/Sub, ומקבלים שירות העברת הודעות אמין וניתן להתאמה לעומס, שמאפשר סטרימינג של אירועי ה-CDC. פרסמתם את השינויים הדומים למציאות במסד הנתונים ב-Pub/Sub, ויצרת מקור נתונים בזמן אמת.
- נתונים שעברו עיבוד באמצעות Dataproc ו-Spark: הקצית אשכול Dataproc ופרסת משימה של Spark Streaming כדי לצרוך הודעות מהמינויים שלך ל-Pub/Sub. עיבדתם והעברתם את אירועי ה-CDC הנכנסים בזמן אמת, והצגתם את התוצאות ביומני המשימות של Dataproc.
- יצירת צינור עיבוד נתונים מקצה לקצה בזמן אמת: שילבתם בהצלחה את השירותים האלה כדי ליצור צינור עיבוד נתונים מלא שמתעד שינויים בנתונים, מעביר אותם בסטרימינג ומעבד אותם בזמן אמת. צברתם ניסיון מעשי בפיתוח מערכת שיכולה לטפל במקורות נתונים רצופיים.
- השתמשתם במחבר Spark Pub/Sub: הגדרתם בהצלחה אשכול Dataproc כך שישתמש במחבר Spark Pub/Sub, שהוא חיוני כדי ש-Spark Structured Streaming יוכל לקרוא נתונים מ-Pub/Sub.
עכשיו יש לכם בסיס יציב ליצירת צינורות עיבוד נתונים מורכבים ומתוחכמים יותר לאפליקציות שונות, כולל ניתוח נתונים בזמן אמת, מחסני נתונים וארכיטקטורות של מיקרו-שירותים. המשיכו לחקור ולפתח!