Créer une capture de données modifiées à l'aide de Dataproc et Cloud Pub/Sub

1. Introduction

df8070bd84336207.png

Dernière mise à jour:19/06/2025

Qu'est-ce que la capture des données modifiées ?

La capture de données modifiées (CDC, Change Data Capture) est un ensemble de modèles de conception logicielle permettant de déterminer et de suivre les données modifiées dans une base de données. En termes plus simples, il s'agit d'un moyen de capturer et d'enregistrer les modifications apportées aux données afin qu'elles puissent être répliquées dans d'autres systèmes.

La capture des données modifiées (CDC) est extrêmement utile dans un large éventail de scénarios basés sur les données, tels que la migration de données, l'entrepôt de données et l'analyse en temps réel, la reprise après sinistre et la haute disponibilité, l'audit et la conformité, etc.

Migration de données

Le CDC simplifie les projets de migration de données en permettant un transfert de données incrémentiel, ce qui réduit les temps d'arrêt et minimise les perturbations.

Entrepôt de données et analyse en temps réel

La capture des données modifiées garantit que les entrepôts de données et les systèmes d'analyse sont constamment mis à jour avec les dernières modifications apportées aux bases de données opérationnelles.

Les entreprises peuvent ainsi prendre des décisions en fonction d'informations en temps réel.

Reprise après sinistre et haute disponibilité

La CDC permet de répliquer des données en temps réel dans des bases de données secondaires à des fins de reprise après sinistre. En cas de défaillance, la CDC permet de basculer rapidement vers une base de données secondaire, ce qui réduit au maximum les temps d'arrêt et les pertes de données.

Audit et conformité

Le CDC fournit un journal d'audit détaillé des modifications apportées aux données, ce qui est essentiel pour respecter les exigences réglementaires.

Objectifs de l'atelier

Dans cet atelier de programmation, vous allez créer un pipeline de données de capture des données modifiées (CDC, Change Data Capture) à l'aide de Cloud Pub/Sub, Dataproc, Python et Apache Spark. Votre pipeline va:

  • Simulez les modifications de la base de données et publiez-les en tant qu'événements dans Cloud Pub/Sub, un service de messagerie évolutif et fiable.
  • Exploitez la puissance de Dataproc, le service Spark et Hadoop géré de Google Cloud, pour traiter ces événements en temps réel.

En connectant ces services, vous créerez un pipeline robuste capable de capturer et de traiter les modifications de données au fur et à mesure qu'elles se produisent. Vous disposerez ainsi d'une base pour l'analyse en temps réel, le stockage de données et d'autres applications critiques.

Points abordés

  • Créer un pipeline de capture des données de modification de base
  • Dataproc pour le traitement par flux
  • Cloud Pub/Sub pour la messagerie en temps réel
  • Principes de base d'Apache Spark

Cet atelier de programmation est consacré à Dataproc et Cloud Pub/Sub. Les concepts et les blocs de codes non pertinents ne sont pas abordés, et vous sont fournis afin que vous puissiez simplement les copier et les coller.

Prérequis

  • un compte GCP actif avec un projet configuré. Si vous n'en avez pas, vous pouvez vous inscrire à un essai sans frais.
  • gcloud CLI installée et configurée.
  • Python 3.7 ou version ultérieure installée pour simuler les modifications de la base de données et interagir avec Pub/Sub
  • Connaissances de base sur Dataproc, Cloud Pub/Sub, Apache Spark et Python

Avant de commencer

Exécutez la commande suivante dans le terminal pour activer les API requises:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. Configurer Cloud Pub/Sub

Créer un sujet

Ce sujet sera utilisé pour publier les modifications de la base de données. La tâche Dataproc sera le client de ces messages et les traitera pour la capture des données de modification. Pour en savoir plus sur ces sujets, consultez la documentation officielle.

gcloud pubsub topics create database-changes

Créer un abonnement

Créez un abonnement qui servira à consommer les messages dans Pub/Sub. Pour en savoir plus sur les abonnements, consultez la documentation officielle.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. Simuler des modifications de base de données

Étapes

  1. Créez un script Python (par exemple, simulate_cdc.py) pour simuler les modifications de la base de données et les publier dans Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

Remplacez YOUR_PROJECT_ID par l'ID de votre projet GCP

  1. Installez la bibliothèque cliente Pub/Sub:
pip install google-cloud-pubsub
  1. Exécutez le script dans votre terminal. Ce script s'exécute en continu et publie des messages toutes les deux secondes sur le sujet Pub/Sub.
python simulate_cdc.py
  1. Après avoir exécuté le script pendant environ une minute, vous aurez suffisamment de messages dans Pub/Sub à consommer. Vous pouvez arrêter le script Python en cours d'exécution en appuyant sur Ctrl+C ou Cmd+C, selon votre OS.
  2. Afficher les messages publiés:

Ouvrez un autre terminal et exécutez la commande suivante pour afficher les messages publiés:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

Une ligne de tableau contenant le message et d'autres champs doit s'afficher:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

Explication

  • Le script Python simule les modifications de la base de données en générant de manière aléatoire des événements INSERT, UPDATE ou DELETE.
  • Chaque modification est représentée sous la forme d'un objet JSON contenant le type de modification, l'ID de l'enregistrement, le code temporel et les données.
  • Le script utilise la bibliothèque cliente Cloud Pub/Sub pour publier ces événements de modification dans le sujet database-changes.
  • La commande "subscriber" vous permet d'afficher les messages envoyés au sujet Pub/Sub.

4. Créer un compte de service pour Dataproc

Dans cette section, vous allez créer un compte de service que le cluster Dataproc peut utiliser. Vous attribuez également les autorisations nécessaires pour autoriser les instances de cluster à accéder à Cloud Pub/Sub et Dataproc.

  1. Créez un compte de service :
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Ajoutez le rôle de nœud de calcul Dataproc pour autoriser le compte de service à créer des clusters et à exécuter des tâches. Ajoutez l'ID du compte de service généré dans la commande précédente en tant que membre dans la commande ci-dessous:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Ajoutez le rôle d'abonné Pub/Sub pour permettre au compte de service de s'abonner à l'abonnement Pub/Sub "change-subscriber" :
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. Créer un cluster Dataproc

Le cluster Dataproc exécutera l'application Spark, qui traitera les messages dans Pub/Sub. Vous aurez besoin du compte de service créé dans la section précédente. Dataproc attribue ce compte de service à chaque instance du cluster afin que toutes les instances disposent des autorisations appropriées pour exécuter l'application.

Utilisez la commande suivante pour créer un cluster Dataproc:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Envoyer la tâche Spark au cluster Dataproc

L'application de streaming Spark traite les messages de modification de la base de données dans Pub/Sub et les imprime dans la console.

Étapes

  1. Créer un répertoire et ajouter le code source du client au fichier PubsubConsumer.scala
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. Créez et ajoutez le code suivant au fichier pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. Accédez au répertoire Spark du projet, puis enregistrez le chemin dans une variable d'environnement à utiliser ultérieurement:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. Changez de répertoire:
cd $REPO_ROOT/spark
  1. Téléchargez Java 1.8 et placez le dossier dans /usr/lib/jvm/. Modifiez ensuite JAVA_HOME pour qu'il pointe vers:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. Créer le fichier JAR de l'application
mvn clean package

L'archive spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar contenant le code et les dépendances de l'application est créée dans le répertoire spark/target.

  1. Envoyez l'application Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. Affichez la liste des tâches actives, puis notez la valeur JOB_ID de la tâche :
gcloud dataproc jobs list --region=us-central1 --state-filter=active

Le résultat ressemblera à ceci :

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. Affichez le résultat de la tâche en ouvrant l'URL suivante dans le navigateur. Remplacez [JOB_ID] par la valeur notée à l'étape précédente.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. Le résultat ressemble à ce qui suit :
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

La tâche de streaming Spark exécutée dans Dataproc extrait les messages de Pub/Sub, les traite et affiche la sortie dans la console.

  1. Arrêter la tâche: exécutez la commande suivante pour arrêter la tâche. Remplacez JOB_ID par le même que celui que vous avez noté précédemment.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

Félicitations ! Vous venez de créer un pipeline CDC puissant qui capture les modifications de la base de données dans Pub/Sub et les traite à l'aide du streaming Spark exécuté dans Cloud Dataproc.

7. Effectuer un nettoyage

Nettoyez toutes les ressources que vous avez créées pour qu'elles ne vous soient plus facturées à l'avenir. Le moyen le plus simple d'empêcher la facturation est de supprimer le projet que vous avez créé pour ce tutoriel. Vous pouvez également supprimer des ressources individuelles.

Exécutez les commandes suivantes pour supprimer des ressources individuelles.

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. Félicitations

Félicitations, vous venez de terminer un atelier de programmation pratique qui vous montre comment créer un pipeline de données en temps réel robuste à l'aide de Google Cloud Platform. Récapitulons ce que vous avez accompli:

  • Capture de données modifiées (CDC) simulée : vous avez appris les principes de base de la capture de données modifiées et implémenté un script Python pour simuler les modifications de la base de données, en générant des événements représentant les modifications de données en temps réel.
  • Utilisation de Cloud Pub/Sub:vous configurez des sujets et des abonnements Cloud Pub/Sub, ce qui vous permet de bénéficier d'un service de messagerie évolutif et fiable pour diffuser vos événements CDC. Vous avez publié vos modifications de base de données simulées dans Pub/Sub, créant ainsi un flux de données en temps réel.
  • Données traitées avec Dataproc et Spark:vous avez provisionné un cluster Dataproc et déployé une tâche Spark Streaming pour consommer les messages de votre abonnement Pub/Sub. Vous avez traité et transformé les événements CDC entrants en temps réel, et affiché les résultats dans les journaux de vos jobs Dataproc.
  • Vous avez créé un pipeline de données en temps réel de bout en bout:vous avez intégré ces services pour créer un pipeline de données complet qui capture, diffuse et traite les modifications de données en temps réel. Vous avez acquis une expérience pratique dans la création d'un système capable de gérer des flux de données continus.
  • Vous avez utilisé le connecteur Spark Pub/Sub:vous avez correctement configuré un cluster Dataproc pour utiliser le connecteur Spark Pub/Sub, qui est essentiel pour que Spark Structured Streaming puisse lire les données de Pub/Sub.

Vous disposez désormais d'une base solide pour créer des pipelines de données plus complexes et sophistiqués pour diverses applications, y compris l'analyse en temps réel, le stockage de données et les architectures de microservices. Continuez à explorer et à créer !

Documents de référence