1. Einführung
Letzte Aktualisierung:19. Juni 2025
Was ist Change Data Capture?
Change Data Capture (CDC) besteht aus einer Reihe von Softwaredesignmustern zum Ermitteln und Verfolgen von Daten, die sich in einer Datenbank geändert haben. Einfacher ausgedrückt: Es ist eine Möglichkeit, Änderungen an Daten zu erfassen und aufzuzeichnen, damit diese Änderungen in anderen Systemen repliziert werden können.
Change Data Capture (CDC) ist in einer Vielzahl von datengetriebenen Szenarien äußerst nützlich, z. B. bei der Datenmigration, bei Echtzeit-Data-Warehousing und ‑Analysen, bei Notfallwiederherstellung und Hochverfügbarkeit sowie bei Audits und Compliance.
Datenmigration
CDC vereinfacht Datenmigrationsprojekte, da eine inkrementelle Datenübertragung möglich ist, was Ausfallzeiten reduziert und Unterbrechungen minimiert.
Data-Warehouse-Prozess und Analytics in Echtzeit
Mit CDC werden Data Warehouses und Analysesysteme kontinuierlich mit den neuesten Änderungen aus operativen Datenbanken aktualisiert.
So können Unternehmen Entscheidungen auf der Grundlage von Echtzeitdaten treffen.
Notfallwiederherstellung und Hochverfügbarkeit
Mit CDC können Daten zur Notfallwiederherstellung in Echtzeit in sekundären Datenbanken repliziert werden. Bei einem Ausfall ermöglicht CDC ein schnelles Failover auf eine sekundäre Datenbank, wodurch Ausfallzeiten und Datenverluste minimiert werden.
Audit und Compliance
CDC bietet einen detaillierten Audit-Trail für Datenänderungen, was für die Einhaltung von Compliance- und behördlichen Anforderungen unerlässlich ist.
Umfang
In diesem Codelab erstellen Sie mit Cloud Pub/Sub, Dataproc, Python und Apache Spark eine Datenpipeline für die Änderungsdatenerfassung (Change Data Capture, CDC). Ihre Pipeline bietet folgende Vorteile:
- Datenbankänderungen simulieren und als Ereignisse in Cloud Pub/Sub veröffentlichen, einem skalierbaren und zuverlässigen Messaging-Dienst
- Mit Dataproc, dem verwalteten Spark- und Hadoop-Dienst von Google Cloud, können Sie diese Ereignisse in Echtzeit verarbeiten.
Wenn Sie diese Dienste verbinden, erhalten Sie eine robuste Pipeline, mit der sich Datenänderungen in Echtzeit erfassen und verarbeiten lassen. Dies bildet die Grundlage für Echtzeitanalysen, Data Warehousing und andere wichtige Anwendungen.
Lerninhalte
- Eine einfache Pipeline zum Erfassen von Änderungsdaten erstellen
- Dataproc für die Streamverarbeitung
- Cloud Pub/Sub für Echtzeit-Messaging
- Grundlagen von Apache Spark
In diesem Codelab geht es um Dataproc und Cloud Pub/Sub. Auf irrelevante Konzepte wird nicht genauer eingegangen und entsprechende Codeblöcke können Sie einfach kopieren und einfügen.
Voraussetzungen
- ein aktives GCP-Konto mit einem eingerichteten Projekt. Wenn du noch kein Konto hast, kannst du dich für einen kostenlosen Testzeitraum registrieren.
- Die gcloud CLI muss installiert und konfiguriert sein.
- Python 3.7 oder höher, um Datenbankänderungen zu simulieren und mit Pub/Sub zu interagieren
- Grundlegende Kenntnisse von Dataproc, Cloud Pub/Sub, Apache Spark und Python
Vorbereitung
Führen Sie im Terminal den folgenden Befehl aus, um die erforderlichen APIs zu aktivieren:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
2. Cloud Pub/Sub einrichten
Thema erstellen
Über dieses Thema werden die Datenbankänderungen veröffentlicht. Der Dataproc-Job ist der Empfänger dieser Nachrichten und verarbeitet sie für die Erfassung von Änderungsdaten. Weitere Informationen zu den einzelnen Themen findest du in der offiziellen Dokumentation.
gcloud pubsub topics create database-changes
Abo erstellen
Erstellen Sie ein Abo, mit dem die Nachrichten in Pub/Sub abgerufen werden. Weitere Informationen zu Abos finden Sie in der offiziellen Dokumentation.
gcloud pubsub subscriptions create --topic database-changes change-subscriber
3. Datenbankänderungen simulieren
Schritte
- Erstellen Sie ein Python-Script (z.B.
simulate_cdc.py
) können Sie Datenbankänderungen simulieren und in Pub/Sub veröffentlichen.
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
Ersetzen Sie YOUR_PROJECT_ID durch Ihre tatsächliche GCP-Projekt-ID.
- Installieren Sie die Pub/Sub-Clientbibliothek:
pip install google-cloud-pubsub
- Führen Sie das Script in Ihrem Terminal aus. Dieses Script wird kontinuierlich ausgeführt und veröffentlicht alle 2 Sekunden Nachrichten an das Pub/Sub-Thema.
python simulate_cdc.py
- Nachdem das Script etwa eine Minute lang ausgeführt wurde, sind genügend Nachrichten in Pub/Sub verfügbar, um sie zu verarbeiten. Sie können das laufende Python-Script beenden, indem Sie je nach Betriebssystem Strg + C oder Cmd + C drücken.
- Veröffentlichte Nachrichten ansehen:
Öffnen Sie ein anderes Terminal und führen Sie den folgenden Befehl aus, um die veröffentlichten Nachrichten aufzurufen:
gcloud pubsub subscriptions pull --auto-ack change-subscriber
Es sollte eine Tabellenzeile mit der Nachricht und anderen Feldern angezeigt werden:
{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}
Erklärung
- Das Python-Script simuliert Datenbankänderungen, indem zufällig
INSERT
-,UPDATE
- oderDELETE
-Ereignisse generiert werden. - Jede Änderung wird als JSON-Objekt dargestellt, das den Änderungstyp, die Datensatz-ID, den Zeitstempel und die Daten enthält.
- Das Script verwendet die Cloud Pub/Sub-Clientbibliothek, um diese Änderungsereignisse im Thema
database-changes
zu veröffentlichen. - Mit dem Befehl „subscriber“ können Sie sich die Nachrichten ansehen, die an das Pub/Sub-Thema gesendet werden.
4. Dienstkonto für Dataproc erstellen
In diesem Abschnitt erstellen Sie ein Dienstkonto, das vom Dataproc-Cluster verwendet werden kann. Sie weisen außerdem die Berechtigungen zu, damit die Clusterinstanzen auf Cloud Pub/Sub und Dataproc zugreifen können.
- Erstellen Sie ein Dienstkonto:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- Fügen Sie die Rolle „Dataproc-Worker“ hinzu, damit das Dienstkonto Cluster erstellen und Jobs ausführen kann. Fügen Sie die im vorherigen Befehl generierte Dienstkonto-ID als Mitglied in den folgenden Befehl ein:
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- Fügen Sie die Rolle „Pub/Sub-Abonnent“ hinzu, damit das Dienstkonto das Pub/Sub-Abo „change-subscriber“ abonnieren kann:
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
5. Dataproc-Cluster erstellen
Der Dataproc-Cluster führt die Spark-Anwendung aus, die die Nachrichten in Pub/Sub verarbeitet. Sie benötigen das im vorherigen Abschnitt erstellte Dienstkonto. Dataproc weist dieses Dienstkonto jeder Instanz im Cluster zu, sodass alle Instanzen die passenden Berechtigungen zum Ausführen der Anwendung erhalten.
Verwenden Sie den folgenden Befehl, um einen Dataproc-Cluster zu erstellen:
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
6. Spark-Job an den Dataproc-Cluster senden
Die Spark-Streaming-App verarbeitet die Datenbankänderungsnachrichten in Pub/Sub und druckt sie in der Konsole aus.
Schritte
- Erstellen Sie ein Verzeichnis und fügen Sie der Datei „PubsubConsumer.scala“ den Quellcode des Verbrauchers hinzu.
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- Erstellen Sie Folgendes und fügen Sie es der Datei pom.xml hinzu.
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- Wechseln Sie zum Spark-Verzeichnis des Projekts und speichern Sie den Pfad in einer Umgebungsvariablen, die später verwendet werden soll:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- Ändern Sie das Verzeichnis:
cd $REPO_ROOT/spark
- Laden Sie Java 1.8 herunter und legen Sie den Ordner unter /usr/lib/jvm/ ab. Ändern Sie dann JAVA_HOME so, dass es auf Folgendes verweist:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- JAR-Datei der Anwendung erstellen
mvn clean package
Das Archiv „spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar“ mit dem Anwendungscode und den Abhängigkeiten wird im Verzeichnis spark/target
erstellt.
- Reichen Sie die Spark-Anwendung ein:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- Zeigen Sie die Liste der aktiven Jobs an und notieren Sie sich den jeweiligen
JOB_ID
-Wert:
gcloud dataproc jobs list --region=us-central1 --state-filter=active
Die Ausgabe sieht in etwa so aus:
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- Zeigen Sie die Jobausgabe an, indem Sie die folgende URL im Browser öffnen. Ersetzen Sie [JOB_ID] durch den im vorherigen Schritt notierten Wert.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- Die Ausgabe sieht etwa so aus:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
Der Spark-Streamingjob, der in Dataproc ausgeführt wird, ruft Nachrichten von Pub/Sub ab, verarbeitet sie und zeigt die Ausgabe in der Konsole an.
- Job beenden: Führen Sie den folgenden Befehl aus, um den Job zu beenden. Ersetzen Sie JOB_ID durch die zuvor notierte Job-ID.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
Glückwunsch! Sie haben gerade eine leistungsstarke CDC-Pipeline erstellt, die die Datenbankänderungen in Pub/Sub erfasst und mit Spark Streaming in Cloud Dataproc verarbeitet.
7. Bereinigen
Bereinigen Sie alle erstellten Ressourcen, damit sie Ihnen nicht in Rechnung gestellt werden. Am einfachsten vermeiden Sie weitere Kosten, indem Sie das für die Anleitung erstellte Projekt löschen. Alternativ können Sie einzelne Ressourcen löschen.
Führen Sie die folgenden Befehle aus, um einzelne Ressourcen zu löschen:
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
8. Glückwunsch
Herzlichen Glückwunsch! Sie haben gerade ein praktisches Codelab abgeschlossen, in dem gezeigt wird, wie Sie mit der Google Cloud Platform eine robuste Echtzeit-Datenpipeline erstellen. Fassen wir noch einmal zusammen, was Sie erreicht haben:
- Simuliertes Change Data Capture (CDC): Sie haben die Grundlagen von CDC kennengelernt und ein Python-Script implementiert, um Datenbankänderungen zu simulieren und Ereignisse zu generieren, die Echtzeitdatenänderungen darstellen.
- Cloud Pub/Sub:Sie richten Cloud Pub/Sub-Themen und ‑Abos ein und erhalten so einen skalierbaren und zuverlässigen Messaging-Dienst zum Streamen Ihrer CDC-Ereignisse. Sie haben Ihre simulierten Datenbankänderungen in Pub/Sub veröffentlicht und so einen Echtzeitdatenstream erstellt.
- Mit Dataproc und Spark verarbeitete Daten:Sie haben einen Dataproc-Cluster bereitgestellt und einen Spark-Streaming-Job bereitgestellt, um Nachrichten aus Ihrem Pub/Sub-Abo zu verarbeiten. Sie haben die eingehenden CDC-Ereignisse in Echtzeit verarbeitet und transformiert und die Ergebnisse in Ihren Dataproc-Jobprotokollen angezeigt.
- End-to-End-Echtzeit-Pipeline erstellt:Sie haben diese Dienste erfolgreich integriert, um eine vollständige Datenpipeline zu erstellen, die Datenänderungen in Echtzeit erfasst, streamt und verarbeitet. Sie haben praktische Erfahrung beim Erstellen eines Systems gesammelt, das kontinuierliche Datenstreams verarbeiten kann.
- Spark Pub/Sub-Connector verwendet:Sie haben einen Dataproc-Cluster für die Verwendung des Spark Pub/Sub-Connectors konfiguriert. Dieser ist für das Lesen von Daten aus Pub/Sub mit Spark Structured Streaming erforderlich.
Sie haben jetzt eine solide Grundlage für die Erstellung komplexerer und ausgefeilterer Datenpipelines für verschiedene Anwendungen, einschließlich Echtzeitanalysen, Data Warehouses und Mikrodienstarchitekturen. Viel Spaß beim Entdecken und Erstellen!