Datenerfassung mit Dataproc und Cloud Pub/Sub erstellen

1. Einführung

df8070bd84336207.png

Zuletzt aktualisiert: 19.06.2025

Was ist Change Data Capture?

Change Data Capture (CDC) besteht aus einer Reihe von Softwaredesignmustern zum Ermitteln und Verfolgen von Daten, die sich in einer Datenbank geändert haben. Einfacher ausgedrückt: Es ist eine Methode zum Erfassen und Aufzeichnen von Änderungen an Daten, damit diese Änderungen in andere Systeme übertragen werden können.

Change Data Capture (CDC) ist in einer Vielzahl von datengesteuerten Szenarien wie Datenmigration, Echtzeit-Data Warehousing und ‑Analysen, Notfallwiederherstellung und Hochverfügbarkeit, Audit und Compliance usw. äußerst nützlich.

Datenmigration

CDC vereinfacht Datenmigrationsprojekte, da eine inkrementelle Datenübertragung möglich ist. So werden Ausfallzeiten und Unterbrechungen minimiert.

Data Warehousing und Analysen in Echtzeit

CDC sorgt dafür, dass Data Warehouses und Analysesysteme ständig mit den neuesten Änderungen aus operativen Datenbanken aktualisiert werden.

So können Unternehmen Entscheidungen auf Grundlage von Echtzeitinformationen treffen.

Notfallwiederherstellung und Hochverfügbarkeit

CDC ermöglicht die Echtzeitreplikation von Daten in sekundäre Datenbanken für die Notfallwiederherstellung. Im Falle eines Fehlers ermöglicht CDC ein schnelles Failover auf eine sekundäre Datenbank, wodurch Ausfallzeiten und Datenverlust minimiert werden.

Audit und Compliance

CDC bietet einen detaillierten Prüfpfad für Datenänderungen, der für die Einhaltung von Vorschriften und behördlichen Anforderungen unerlässlich ist.

Umfang

In diesem Codelab erstellen Sie eine CDC-Datenpipeline (Change Data Capture) mit Cloud Pub/Sub, Dataproc, Python und Apache Spark. Ihre Pipeline:

  • Simulieren Sie Datenbankänderungen und veröffentlichen Sie sie als Ereignisse in Cloud Pub/Sub, einem skalierbaren und zuverlässigen Messaging-Dienst.
  • Nutzen Sie die Leistungsfähigkeit von Dataproc, dem verwalteten Spark- und Hadoop-Dienst von Google Cloud, um diese Ereignisse in Echtzeit zu verarbeiten.

Durch die Verbindung dieser Dienste erstellen Sie eine robuste Pipeline, mit der Datenänderungen erfasst und verarbeitet werden können, sobald sie eintreten. So schaffen Sie die Grundlage für Echtzeitanalysen, Data Warehousing und andere wichtige Anwendungen.

Lerninhalte

In diesem Codelab geht es um Dataproc und Cloud Pub/Sub. Auf irrelevante Konzepte wird nicht genauer eingegangen und entsprechende Codeblöcke können Sie einfach kopieren und einfügen.

Voraussetzungen

  • ein aktives GCP-Konto mit einem eingerichteten Projekt. Wenn du noch kein Konto hast, kannst du dich für einen kostenlosen Testzeitraum registrieren.
  • Die gcloud CLI muss installiert und konfiguriert sein.
  • Python 3.7+ ist installiert, um Datenbankänderungen zu simulieren und mit Pub/Sub zu interagieren.
  • Grundkenntnisse von Dataproc, Cloud Pub/Sub, Apache Spark und Python.

Vorbereitung

Führen Sie im Terminal den folgenden Befehl aus, um die erforderlichen APIs zu aktivieren:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. Cloud Pub/Sub einrichten

Thema erstellen

Dieses Thema wird verwendet, um die Datenbankänderungen zu veröffentlichen. Der Dataproc-Job ist der Empfänger dieser Nachrichten und verarbeitet sie für Change Data Capture. Weitere Informationen zu Themen finden Sie in der offiziellen Dokumentation.

gcloud pubsub topics create database-changes

Abo erstellen

Erstellen Sie ein Abo, mit dem die Nachrichten in Pub/Sub genutzt werden. Weitere Informationen zu Abos finden Sie in der offiziellen Dokumentation.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. Datenbankänderungen simulieren

Schritte

  1. Erstellen Sie ein Python-Skript (z.B. simulate_cdc.py), um Datenbankänderungen zu simulieren und in Pub/Sub zu veröffentlichen.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

Ersetzen Sie YOUR_PROJECT_ID durch Ihre tatsächliche GCP-Projekt-ID.

  1. Installieren Sie die Pub/Sub-Clientbibliothek:
pip install google-cloud-pubsub
  1. Führen Sie das Skript im Terminal aus. Dieses Skript wird kontinuierlich ausgeführt und veröffentlicht alle 2 Sekunden Nachrichten im Pub/Sub-Thema.
python simulate_cdc.py
  1. Nachdem Sie das Skript etwa eine Minute lang ausgeführt haben, sind genügend Nachrichten in Pub/Sub vorhanden, die Sie nutzen können. Sie können das laufende Python-Skript beenden, indem Sie je nach Betriebssystem Strg + C oder Cmd + C drücken.
  2. Veröffentlichte Nachrichten ansehen:

Öffnen Sie ein weiteres Terminal und führen Sie den folgenden Befehl aus, um die veröffentlichten Nachrichten aufzurufen:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

Es sollte eine Tabellenzeile mit der Nachricht und anderen Feldern angezeigt werden:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

Erklärung

  • Das Python-Skript simuliert Datenbankänderungen, indem es zufällig INSERT-, UPDATE- oder DELETE-Ereignisse generiert.
  • Jede Änderung wird als JSON-Objekt dargestellt, das den Änderungstyp, die Datensatz-ID, den Zeitstempel und die Daten enthält.
  • Das Skript verwendet die Cloud Pub/Sub-Clientbibliothek, um diese Änderungsereignisse im Thema database-changes zu veröffentlichen.
  • Mit dem Abonnentenbefehl können Sie die Nachrichten ansehen, die an das Pub/Sub-Thema gesendet werden.

4. Dienstkonto für Dataproc erstellen

In diesem Abschnitt erstellen Sie ein Dienstkonto, das vom Dataproc-Cluster verwendet werden kann. Sie weisen außerdem die Berechtigungen zu, damit die Clusterinstanzen auf Cloud Pub/Sub und Dataproc zugreifen können.

  1. Erstellen Sie ein Dienstkonto:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Fügen Sie die Dataproc-Worker-Rolle hinzu, damit das Dienstkonto Cluster erstellen und Jobs ausführen kann. Fügen Sie die im vorherigen Befehl generierte Dienstkonto-ID als Mitglied in den folgenden Befehl ein:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Fügen Sie die Rolle „Pub/Sub-Abonnent“ hinzu, damit über das Dienstkonto das Pub/Sub-Abo „change-subscriber“ abonniert werden kann:
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. Dataproc-Cluster erstellen

Auf dem Dataproc-Cluster wird die Spark-Anwendung ausgeführt, die die Nachrichten in Pub/Sub verarbeitet. Sie benötigen das Dienstkonto, das Sie im vorherigen Abschnitt erstellt haben. Dataproc weist dieses Dienstkonto jeder Instanz im Cluster zu, sodass alle Instanzen die passenden Berechtigungen zum Ausführen der Anwendung erhalten.

Verwenden Sie den folgenden Befehl, um einen Dataproc-Cluster zu erstellen:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Spark-Job an den Dataproc-Cluster senden

Die Spark Streaming-App verarbeitet die Datenbankänderungsnachrichten in Pub/Sub und gibt sie in der Konsole aus.

Schritte

  1. Verzeichnis erstellen und Quellcode des Consumers in die Datei „PubsubConsumer.scala“ einfügen
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. Erstellen Sie Folgendes und fügen Sie es der Datei pom.xml hinzu:
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. Wechseln Sie zum Spark-Verzeichnis des Projekts und speichern Sie den Pfad in einer Umgebungsvariable, die später verwendet wird:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. Wechseln Sie das Verzeichnis:
cd $REPO_ROOT/spark
  1. Laden Sie Java 1.8 herunter und legen Sie den Ordner in /usr/lib/jvm/ ab. Ändern Sie dann JAVA_HOME so, dass es auf Folgendes verweist:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. JAR-Datei für die Anwendung erstellen
mvn clean package

Das Archiv spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar mit dem Anwendungscode und den Abhängigkeiten wird im Verzeichnis spark/target erstellt.

  1. Senden Sie die Spark-Anwendung:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. Zeigen Sie die Liste der aktiven Jobs an und notieren Sie sich den jeweiligen JOB_ID-Wert:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

Die Ausgabe sieht etwa so aus:

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. Zeigen Sie die Jobausgabe an, indem Sie die folgende URL im Browser öffnen. Ersetzen Sie [JOB_ID] durch den im vorherigen Schritt notierten Wert.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. Die Ausgabe sieht etwa so aus:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

Der in Dataproc ausgeführte Spark-Streamingjob ruft Nachrichten aus Pub/Sub ab, verarbeitet sie und gibt die Ausgabe in der Konsole aus.

  1. Job beenden: Führen Sie den folgenden Befehl aus, um den Job zu beenden. Ersetzen Sie JOB_ID durch die Job-ID, die Sie zuvor notiert haben.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

Glückwunsch! Sie haben gerade eine leistungsstarke CDC-Pipeline erstellt, die die Datenbankänderungen in Pub/Sub erfasst und mit Spark-Streaming in Cloud Dataproc verarbeitet.

7. Bereinigen

Bereinigen Sie alle erstellten Ressourcen, damit sie Ihnen nicht in Rechnung gestellt werden. Am einfachsten vermeiden Sie weitere Kosten, indem Sie das für die Anleitung erstellte Projekt löschen. Alternativ können Sie einzelne Ressourcen löschen.

Führen Sie die folgenden Befehle aus, um einzelne Ressourcen zu löschen.

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. Glückwunsch

Herzlichen Glückwunsch! Sie haben gerade ein Codelab abgeschlossen, in dem gezeigt wird, wie Sie mit der Google Cloud Platform eine robuste Echtzeit-Datenpipeline erstellen. Fassen wir noch einmal zusammen, was Sie gelernt haben:

  • Simuliertes Change Data Capture (CDC): Sie haben die Grundlagen von CDC kennengelernt und ein Python-Skript implementiert, um Datenbankänderungen zu simulieren und Ereignisse zu generieren, die Echtzeitdatenänderungen darstellen.
  • Cloud Pub/Sub genutzt:Sie richten Cloud Pub/Sub-Themen und -Abos ein und stellen so einen skalierbaren und zuverlässigen Messaging-Dienst für das Streamen Ihrer CDC-Ereignisse bereit. Sie haben Ihre simulierten Datenbankänderungen in Pub/Sub veröffentlicht und so einen Echtzeitdatenstream erstellt.
  • Verarbeitete Daten mit Dataproc und Spark:Sie haben einen Dataproc-Cluster bereitgestellt und einen Spark Streaming-Job bereitgestellt, um Nachrichten aus Ihrem Pub/Sub-Abo zu nutzen. Sie haben die eingehenden CDC-Ereignisse in Echtzeit verarbeitet und transformiert und die Ergebnisse in den Dataproc-Joblogs angezeigt.
  • End-to-End-Echtzeitpipeline erstellt:Sie haben diese Dienste erfolgreich integriert, um eine vollständige Datenpipeline zu erstellen, die Datenänderungen in Echtzeit erfasst, streamt und verarbeitet. Sie haben praktische Erfahrungen beim Erstellen eines Systems gesammelt, das kontinuierliche Datenstreams verarbeiten kann.
  • Spark Pub/Sub-Connector verwendet:Sie haben einen Dataproc-Cluster erfolgreich für die Verwendung des Spark Pub/Sub-Connectors konfiguriert. Dieser ist entscheidend dafür, dass Spark Structured Streaming Daten aus Pub/Sub lesen kann.

Sie haben jetzt eine solide Grundlage für die Entwicklung komplexerer und anspruchsvollerer Datenpipelines für verschiedene Anwendungen, darunter Echtzeitanalysen, Data Warehousing und Mikrodienstarchitekturen. Viel Spaß beim Erkunden und Erstellen!

Referenzdokumente