Créer une capture de données modifiées à l'aide de Dataproc et Cloud Pub/Sub

1. Introduction

df8070bd84336207.png

Dernière mise à jour : 19/06/2025

Qu'est-ce que la capture des données modifiées ?

La capture de données modifiées (CDC, Change Data Capture) est un ensemble de modèles de conception logicielle permettant de déterminer et de suivre les données qui ont été modifiées dans une base de données. En d'autres termes, il s'agit d'une méthode permettant de capturer et d'enregistrer les modifications apportées aux données afin qu'elles puissent être répliquées dans d'autres systèmes.

La capture des données modifiées (CDC) est extrêmement utile dans un large éventail de scénarios axés sur les données, comme la migration de données, l'entreposage et l'analyse de données en temps réel, la reprise après sinistre et la haute disponibilité, l'audit et la conformité, etc.

Migration de données

CDC simplifie les projets de migration de données en permettant le transfert incrémentiel des données, ce qui réduit les temps d'arrêt et minimise les perturbations.

Entrepôt de données et analyse en temps réel

La CDC garantit que les entrepôts de données et les systèmes analytiques sont constamment mis à jour avec les dernières modifications apportées aux bases de données opérationnelles.

Les entreprises peuvent ainsi prendre des décisions en fonction d'informations en temps réel.

Reprise après sinistre et haute disponibilité

La CDC permet la réplication des données en temps réel vers des bases de données secondaires à des fins de reprise après sinistre. En cas de défaillance, la CDC permet un basculement rapide vers une base de données secondaire, ce qui minimise les temps d'arrêt et les pertes de données.

Audit et conformité

La CDC fournit une piste d'audit détaillée des modifications apportées aux données, ce qui est essentiel pour la conformité et les exigences réglementaires.

Objectifs de l'atelier

Dans cet atelier de programmation, vous allez créer un pipeline de données de capture des données modifiées (CDC) à l'aide de Cloud Pub/Sub, Dataproc, Python et Apache Spark. Votre pipeline :

  • Simulez les modifications de la base de données et publiez-les en tant qu'événements dans Cloud Pub/Sub, un service de messagerie évolutif et fiable.
  • Exploitez la puissance de Dataproc, le service géré Spark et Hadoop de Google Cloud, pour traiter ces événements en temps réel.

En connectant ces services, vous créerez un pipeline robuste capable de capturer et de traiter les modifications de données au fur et à mesure qu'elles se produisent. Vous disposerez ainsi d'une base pour l'analyse en temps réel, l'entreposage de données et d'autres applications critiques.

Points abordés

  • Créer un pipeline de capture des données modifiées de base
  • Dataproc pour le traitement des flux
  • Cloud Pub/Sub pour la messagerie en temps réel
  • Principes de base d'Apache Spark

Cet atelier de programmation est consacré à Dataproc et Cloud Pub/Sub. Les concepts et les blocs de codes non pertinents ne sont pas abordés, et vous sont fournis afin que vous puissiez simplement les copier et les coller.

Prérequis

  • un compte GCP actif avec un projet configuré. Si vous n'en avez pas, vous pouvez vous inscrire à un essai sans frais.
  • gcloud CLI est installé et configuré.
  • Python 3.7 ou version ultérieure installé pour simuler les modifications de la base de données et interagir avec Pub/Sub.
  • Connaissances de base de Dataproc, Cloud Pub/Sub, Apache Spark et Python.

Avant de commencer

Exécutez la commande suivante dans le terminal pour activer les API requises :

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. Configurer Cloud Pub/Sub

Créer un sujet

Ce sujet sera utilisé pour publier les modifications apportées à la base de données. Le job Dataproc sera le consommateur de ces messages et les traitera pour la capture des données modifiées. Pour en savoir plus sur les thèmes, consultez la documentation officielle ici.

gcloud pubsub topics create database-changes

Créer un abonnement

Créez un abonnement qui sera utilisé pour consommer les messages dans Pub/Sub. Pour en savoir plus sur les abonnements, consultez la documentation officielle ici.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. Simuler des modifications de base de données

Étapes

  1. Créez un script Python (par exemple, simulate_cdc.py) pour simuler les modifications de la base de données et les publier dans Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

Remplacez YOUR_PROJECT_ID par l'ID de votre projet GCP.

  1. Installez la bibliothèque cliente Pub/Sub :
pip install google-cloud-pubsub
  1. Exécutez le script dans votre terminal. Ce script s'exécutera en continu et publiera des messages toutes les deux secondes sur le sujet Pub/Sub.
python simulate_cdc.py
  1. Après avoir exécuté le script pendant une minute, par exemple, vous aurez suffisamment de messages dans Pub/Sub pour les consommer. Vous pouvez arrêter le script Python en cours d'exécution en appuyant sur Ctrl+C ou Cmd+C, selon votre système d'exploitation.
  2. Afficher les messages publiés :

Ouvrez un autre terminal et exécutez la commande suivante pour afficher les messages publiés :

gcloud pubsub subscriptions pull --auto-ack change-subscriber

Une ligne de tableau contenant le message et d'autres champs doit s'afficher :

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

Explication

  • Le script Python simule les modifications de la base de données en générant aléatoirement des événements INSERT, UPDATE ou DELETE.
  • Chaque modification est représentée sous la forme d'un objet JSON contenant le type de modification, l'ID de l'enregistrement, le code temporel et les données.
  • Le script utilise la bibliothèque cliente Cloud Pub/Sub pour publier ces événements de modification dans le sujet database-changes.
  • La commande d'abonné vous permet d'afficher les messages envoyés au sujet Pub/Sub.

4. Créer un compte de service pour Dataproc

Dans cette section, vous allez créer un compte de service que le cluster Dataproc pourra utiliser. Vous attribuez également les autorisations nécessaires pour que les instances de cluster puissent accéder à Cloud Pub/Sub et Dataproc.

  1. Créez un compte de service :
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Ajoutez le rôle de nœud de calcul Dataproc pour permettre au compte de service de créer des clusters et d'exécuter des tâches. Ajoutez l'ID du compte de service généré dans la commande précédente en tant que membre dans la commande ci-dessous :
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Ajoutez le rôle d'abonné Pub/Sub pour permettre au compte de service de s'abonner à l'abonnement Pub/Sub "change-subscriber" :
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. Créer un cluster Dataproc

Le cluster Dataproc exécutera l'application Spark qui traitera les messages dans Pub/Sub. Vous aurez besoin du compte de service créé dans la section précédente. Dataproc attribue ce compte de service à chaque instance du cluster afin que toutes les instances disposent des autorisations appropriées pour exécuter l'application.

Utilisez la commande suivante pour créer un cluster Dataproc :

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Envoyer le job Spark au cluster Dataproc

L'application de streaming Spark traite les messages de modification de la base de données dans Pub/Sub et les affiche dans la console.

Étapes

  1. Créez un répertoire et ajoutez le code source du consommateur au fichier PubsubConsumer.scala.
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. Créez le fichier pom.xml et ajoutez-y le contenu suivant :
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. Accédez au répertoire Spark du projet et enregistrez le chemin dans une variable d'environnement en vue de l'utiliser ultérieurement :
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. Changez de répertoire :
cd $REPO_ROOT/spark
  1. Téléchargez Java 1.8 et placez le dossier dans /usr/lib/jvm/. Modifiez ensuite JAVA_HOME pour qu'il pointe vers cet emplacement :
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. Compiler le fichier jar de l'application
mvn clean package

L'archive spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar contenant le code et les dépendances de l'application est créée dans le répertoire spark/target.

  1. Envoyez l'application Spark :
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. Affichez la liste des tâches actives, puis notez la valeur JOB_ID de la tâche :
gcloud dataproc jobs list --region=us-central1 --state-filter=active

Le résultat ressemblera à ceci :

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. Affichez le résultat de la tâche en ouvrant l'URL suivante dans le navigateur. Remplacez [JOB_ID] par la valeur notée à l'étape précédente.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. Le résultat ressemble à ce qui suit :
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

Le job de streaming Spark exécuté dans Dataproc extrait les messages de Pub/Sub, les traite et affiche le résultat dans la console.

  1. Arrêter le job : exécutez la commande suivante pour arrêter le job. Remplacez JOB_ID par celui que nous avons noté précédemment.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

Félicitations ! Vous venez de créer un puissant pipeline CDC qui capture les modifications de la base de données dans Pub/Sub et les traite à l'aide du streaming Spark exécuté dans Cloud Dataproc.

7. Effectuer un nettoyage

Nettoyez toutes les ressources que vous avez créées pour qu'elles ne vous soient plus facturées à l'avenir. Le moyen le plus simple d'empêcher la facturation est de supprimer le projet que vous avez créé pour ce tutoriel. Vous pouvez également supprimer des ressources individuelles.

Exécutez les commandes suivantes pour supprimer des ressources individuelles.

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. Félicitations

Félicitations, vous venez de terminer un atelier de programmation pratique qui vous montre comment créer un pipeline de données en temps réel robuste à l'aide de Google Cloud Platform. Récapitulons ce que vous avez accompli :

  • Capture de données modifiées (CDC) simulée : vous avez appris les principes de base de la CDC et implémenté un script Python pour simuler les modifications de la base de données, en générant des événements qui représentent les modifications des données en temps réel.
  • Cloud Pub/Sub utilisé : vous configurez des sujets et des abonnements Cloud Pub/Sub, ce qui fournit un service de messagerie évolutif et fiable pour diffuser vos événements CDC. Vous avez publié les modifications simulées de votre base de données sur Pub/Sub, créant ainsi un flux de données en temps réel.
  • Données traitées avec Dataproc et Spark : vous avez provisionné un cluster Dataproc et déployé un job Spark Streaming pour consommer les messages de votre abonnement Pub/Sub. Vous avez traité et transformé les événements CDC entrants en temps réel, et affiché les résultats dans les journaux de vos jobs Dataproc.
  • Créer un pipeline en temps réel de bout en bout : vous avez réussi à intégrer ces services pour créer un pipeline de données complet qui capture, diffuse et traite les modifications de données en temps réel. Vous avez acquis une expérience pratique dans la création d'un système capable de gérer des flux de données continus.
  • Utilisation du connecteur Spark Pub/Sub : vous avez configuré un cluster Dataproc pour utiliser le connecteur Spark Pub/Sub, ce qui est essentiel pour que Spark Structured Streaming puisse lire les données de Pub/Sub.

Vous disposez désormais d'une base solide pour créer des pipelines de données plus complexes et sophistiqués pour diverses applications, y compris l'analyse en temps réel, l'entreposage de données et les architectures de microservices. Continuez à explorer et à créer !

Documents de référence