Creazione di Change Data Capture utilizzando Dataproc e Cloud Pub/Sub

1. Introduzione

df8070bd84336207.png

Ultimo aggiornamento: 19/06/2025

Che cos'è Change Data Capture?

Change Data Capture (CDC) è un insieme di pattern di progettazione del software utilizzati per determinare e monitorare i dati modificati in un database. In termini più semplici, è un modo per acquisire e registrare le modifiche apportate ai dati in modo che possano essere replicate in altri sistemi.

Change Data Capture (CDC) è incredibilmente utile in un'ampia gamma di scenari basati sui dati, come migrazione dei dati, data warehousing e analisi in tempo reale, ripristino di emergenza e alta disponibilità, controllo e conformità e così via.

Migrazione dei dati

CDC semplifica i progetti di migrazione dei dati consentendo il trasferimento incrementale dei dati, riducendo i tempi di inattività e al minimo le interruzioni.

Data warehousing e analisi in tempo reale

CDC garantisce che i data warehouse e i sistemi analitici vengano costantemente aggiornati con le ultime modifiche apportate ai database operativi.

In questo modo, le aziende possono prendere decisioni basate su informazioni in tempo reale.

Ripristino di emergenza e alta disponibilità

CDC consente la replica in tempo reale dei dati nei database secondari a scopo di disaster recovery. In caso di errore, CDC consente il failover rapido a un database secondario, riducendo al minimo i tempi di inattività e la perdita di dati.

Controllo e conformità

CDC fornisce un audit trail dettagliato delle modifiche ai dati, essenziale per la conformità e i requisiti normativi.

Cosa creerai

In questo codelab creerai una pipeline di dati Change Data Capture (CDC) utilizzando Cloud Pub/Sub, Dataproc, Python e Apache Spark. La pipeline:

  • Simula le modifiche al database e pubblicale come eventi in Cloud Pub/Sub, un servizio di messaggistica scalabile e affidabile.
  • Sfrutta la potenza di Dataproc, il servizio Spark e Hadoop gestito di Google Cloud, per elaborare questi eventi in tempo reale.

Se colleghi questi servizi, creerai una pipeline solida in grado di acquisire ed elaborare le modifiche ai dati man mano che si verificano, fornendo una base per l'analisi in tempo reale, il data warehousing e altre applicazioni critiche.

Cosa imparerai a fare

  • Come creare una pipeline di base per l'acquisizione delle modifiche ai dati
  • Dataproc per l'elaborazione di stream
  • Cloud Pub/Sub per la messaggistica in tempo reale
  • Nozioni di base di Apache Spark

Questo codelab è incentrato su Dataproc e Cloud Pub/Sub. Concetti e blocchi di codice non pertinenti sono trattati solo superficialmente e sono forniti solo per operazioni di copia e incolla.

Che cosa ti serve

  • un account GCP attivo con un progetto configurato. Se non ne hai uno, puoi registrarti per una prova senza costi.
  • gcloud CLI installata e configurata.
  • Python 3.7+ installato per simulare le modifiche al database e interagire con Pub/Sub.
  • Conoscenza di base di Dataproc, Cloud Pub/Sub, Apache Spark e Python.

Prima di iniziare

Esegui questo comando nel terminale per abilitare le API richieste:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. Configurare Cloud Pub/Sub

Creare un argomento

Questo argomento verrà utilizzato per pubblicare le modifiche al database. Il job Dataproc sarà il consumer di questi messaggi e li elaborerà per l'acquisizione delle modifiche ai dati. Se vuoi saperne di più sugli argomenti, puoi leggere la documentazione ufficiale qui.

gcloud pubsub topics create database-changes

Crea un abbonamento

Crea una sottoscrizione che verrà utilizzata per utilizzare i messaggi in Pub/Sub. Per saperne di più sugli abbonamenti, puoi leggere la documentazione ufficiale qui.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. Simula modifiche al database

Passi

  1. Crea uno script Python (ad es. simulate_cdc.py) per simulare le modifiche al database e pubblicarle in Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

Sostituisci YOUR_PROJECT_ID con l'ID progetto GCP effettivo.

  1. Installa la libreria client Pub/Sub:
pip install google-cloud-pubsub
  1. Esegui lo script sul terminale. Questo script verrà eseguito continuamente e pubblicherà messaggi ogni 2 secondi nell'argomento Pub/Sub.
python simulate_cdc.py
  1. Dopo aver eseguito lo script per, ad esempio, 1 minuto, avrai un numero sufficiente di messaggi in Pub/Sub da utilizzare. Puoi terminare lo script Python in esecuzione premendo Ctrl + C o Cmd + C, a seconda del sistema operativo.
  2. Visualizza messaggi pubblicati:

Apri un altro terminale ed esegui il comando seguente per visualizzare i messaggi pubblicati:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

Dovresti vedere una riga della tabella contenente il messaggio e altri campi:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

Spiegazione

  • Lo script Python simula le modifiche al database generando in modo casuale eventi INSERT, UPDATE o DELETE.
  • Ogni modifica è rappresentata come un oggetto JSON contenente il tipo di modifica, l'ID record, il timestamp e i dati.
  • Lo script utilizza la libreria client Cloud Pub/Sub per pubblicare questi eventi di modifica nell'argomento database-changes.
  • Il comando subscriber consente di visualizzare i messaggi inviati all'argomento Pub/Sub.

4. Crea un service account per Dataproc

In questa sezione crei un service account che il cluster Dataproc può utilizzare. Assegni anche le autorizzazioni necessarie per consentire alle istanze del cluster di accedere a Cloud Pub/Sub e Dataproc.

  1. Crea un service account:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Aggiungi il ruolo di nodo di lavoro Dataproc per consentire al service account di creare cluster ed eseguire job. Aggiungi l'ID service account generato nel comando precedente come membro nel comando seguente:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Aggiungi il ruolo Sottoscrittore Pub/Sub per consentire al service account di sottoscrivere l'abbonamento Pub/Sub "change-subscriber":
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. Crea un cluster Dataproc

Il cluster Dataproc eseguirà l'app Spark che elaborerà i messaggi in Pub/Sub. Devi avere il service account creato nella sezione precedente. Dataproc assegna questo service account a ogni istanza del cluster in modo che tutte le istanze ottengano le autorizzazioni corrette per eseguire l'app.

Utilizza il seguente comando per creare un cluster Dataproc:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Invia il job Spark al cluster Dataproc

L'app Spark Streaming elabora i messaggi di modifica del database in Pub/Sub e li stampa nella console.

Passi

  1. Crea una directory e aggiungi il codice sorgente del consumer al file PubsubConsumer.scala
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. Crea e aggiungi quanto segue a pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. Passa alla directory Spark del progetto e salva il percorso in una variabile di ambiente da utilizzare in seguito:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. Cambia la directory:
cd $REPO_ROOT/spark
  1. Scarica Java 1.8 e posiziona la cartella in /usr/lib/jvm/. Quindi, modifica JAVA_HOME in modo che punti a questo percorso:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. Crea il file JAR dell'applicazione
mvn clean package

L'archivio spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar contenente il codice dell'applicazione e le dipendenze viene creato nella directory spark/target

  1. Invia la richiesta di Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. Visualizza l'elenco dei job attivi e prendi nota del valore JOB_ID per il job:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

L'output sarà simile a

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. Visualizza l'output del job aprendo il seguente URL nel browser. Sostituisci [JOB_ID] con il valore annotato nel passaggio precedente.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. L'output è simile al seguente:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

Il job di streaming Spark in esecuzione in Dataproc recupera i messaggi da Pub/Sub, li elabora e visualizza l'output nella console.

  1. Terminazione del job: esegui questo comando per terminare il job. Sostituisci JOB_ID con lo stesso ID annotato in precedenza.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

Complimenti! Hai appena creato una potente pipeline CDC che acquisisce le modifiche al database in Pub/Sub e le elabora utilizzando Spark Streaming in esecuzione in Cloud Dataproc.

7. Esegui la pulizia

Esegui la pulizia delle risorse che hai creato in modo che non ti vengano addebitate in futuro. Il modo più semplice per eliminare la fatturazione è eliminare il progetto creato per il tutorial. In alternativa, puoi eliminare le singole risorse.

Esegui i seguenti comandi per eliminare le singole risorse

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. Complimenti

Congratulazioni, hai appena completato un codelab pratico che mostra come creare una pipeline di dati in tempo reale solida utilizzando Google Cloud. Ricapitoliamo cosa hai fatto:

  • Change Data Capture (CDC) simulato: hai appreso le nozioni di base di CDC e hai implementato uno script Python per simulare le modifiche al database, generando eventi che rappresentano le modifiche ai dati in tempo reale.
  • Cloud Pub/Sub utilizzato: hai configurato argomenti e abbonamenti Cloud Pub/Sub, fornendo un servizio di messaggistica scalabile e affidabile per lo streaming degli eventi CDC. Hai pubblicato le modifiche al database simulate su Pub/Sub, creando uno stream di dati in tempo reale.
  • Dati elaborati con Dataproc e Spark:hai eseguito il provisioning di un cluster Dataproc e hai eseguito il deployment di un job Spark Streaming per utilizzare i messaggi dell'abbonamento Pub/Sub. Hai elaborato e trasformato gli eventi CDC in entrata in tempo reale, visualizzando i risultati nei log dei job Dataproc.
  • Creazione di una pipeline end-to-end in tempo reale:hai integrato correttamente questi servizi per creare una pipeline di dati completa che acquisisce, trasmette in streaming ed elabora le modifiche ai dati in tempo reale. Hai acquisito esperienza pratica nella creazione di un sistema in grado di gestire flussi di dati continui.
  • Utilizzo del connettore Spark Pub/Sub:hai configurato correttamente un cluster Dataproc per utilizzare il connettore Spark Pub/Sub, che è fondamentale per Spark Structured Streaming per leggere i dati da Pub/Sub.

Ora hai una base solida per creare pipeline di dati più complesse e sofisticate per varie applicazioni, tra cui analisi in tempo reale, data warehousing e architetture di microservizi. Continua a esplorare e costruire.

Documentazione di riferimento