1. Introduzione
Ultimo aggiornamento: 19/06/2025
Che cos'è Change Data Capture?
Change Data Capture (CDC) è un insieme di pattern di progettazione software utilizzati per determinare e monitorare i dati che sono stati modificati in un database. In termini più semplici, è un modo per acquisire e registrare le modifiche apportate ai dati in modo che possano essere replicate in altri sistemi.
Change Data Capture (CDC) è incredibilmente utile in una vasta gamma di scenari basati sui dati, come la migrazione dei dati, l'analisi e il data warehousing in tempo reale, il disaster recovery e l'alta disponibilità, la conformità e il controllo e così via.
Migrazione dei dati
La tecnologia CDC semplifica i progetti di migrazione dei dati consentendo il trasferimento incrementale dei dati, riducendo i tempi di inattività e le interruzioni al minimo.
Data warehousing e analisi in tempo reale
La CDC garantisce che i data warehouse e i sistemi di analisi vengano aggiornati costantemente con le ultime modifiche dei database operativi.
In questo modo, le aziende possono prendere decisioni in base a informazioni in tempo reale.
Disaster Recovery e alta disponibilità
La CDC consente la replica in tempo reale dei dati in database secondari a fini di disaster recovery. In caso di errore, la funzionalità CDC consente di eseguire rapidamente il failover su un database secondario, riducendo al minimo i tempi di inattività e la perdita di dati.
Controllo e conformità
CDC fornisce una traccia di controllo dettagliata delle modifiche ai dati, che è essenziale per la conformità e i requisiti normativi.
Cosa creerai
In questo codelab, creerai una pipeline di dati con Change Data Capture (CDC) utilizzando Cloud Pub/Sub, Dataproc, Python e Apache Spark. La pipeline:
- Simula le modifiche al database e pubblicale come eventi in Cloud Pub/Sub, un servizio di messaggistica scalabile e affidabile.
- Sfrutta la potenza di Dataproc, il servizio Spark e Hadoop gestito di Google Cloud, per elaborare questi eventi in tempo reale.
Collegando questi servizi, creerai una pipeline solida in grado di acquisire ed elaborare le modifiche ai dati man mano che si verificano, fornendo una base per l'analisi in tempo reale, il data warehousing e altre applicazioni fondamentali.
Cosa imparerai a fare
- Come creare una pipeline di base per il rilevamento dei dati modificati
- Dataproc per l'elaborazione dei flussi
- Cloud Pub/Sub per la messaggistica in tempo reale
- Nozioni di base su Apache Spark
Questo codelab è incentrato su Dataproc e Cloud Pub/Sub. Concetti e blocchi di codice non pertinenti sono trattati solo superficialmente e sono forniti solo per operazioni di copia e incolla.
Che cosa ti serve
- Un account Google Cloud attivo con un progetto configurato. Se non ne hai uno, puoi registrarti per una prova senza costi.
- gcloud CLI installata e configurata.
- Python 3.7 o versioni successive installate per simulare le modifiche al database e interagire con Pub/Sub.
- Conoscenze di base di Dataproc, Cloud Pub/Sub, Apache Spark e Python.
Prima di iniziare
Esegui il seguente comando nel terminale per abilitare le API richieste:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
2. Configurare Cloud Pub/Sub
Creare un argomento
Questo argomento verrà utilizzato per pubblicare le modifiche al database. Il job Dataproc sarà il consumatore di questi messaggi ed elaborerà i messaggi per il rilevamento dei dati sulle modifiche. Per scoprire di più sugli argomenti, puoi leggere la documentazione ufficiale qui.
gcloud pubsub topics create database-changes
Creare una sottoscrizione
Crea una sottoscrizione che verrà utilizzata per consumare i messaggi in Pub/Sub. Per scoprire di più sugli abbonamenti, puoi leggere la documentazione ufficiale qui.
gcloud pubsub subscriptions create --topic database-changes change-subscriber
3. Simulare modifiche al database
Passi
- Crea uno script Python (ad es.
simulate_cdc.py
) per simulare le modifiche al database e pubblicarle in Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
Sostituisci YOUR_PROJECT_ID con l'ID progetto Google Cloud effettivo
- Installa la libreria client Pub/Sub:
pip install google-cloud-pubsub
- Esegui lo script sul terminale. Questo script verrà eseguito continuamente e pubblicherà messaggi ogni 2 secondi nell'argomento Pub/Sub.
python simulate_cdc.py
- Dopo aver eseguito lo script per, diciamo, 1 minuto, avrai a disposizione un numero sufficiente di messaggi in Pub/Sub da consumare. Puoi terminare lo script Python in esecuzione premendo Ctrl + C o Cmd + C, a seconda del sistema operativo.
- Visualizza messaggi pubblicati:
Apri un altro terminale ed esegui il seguente comando per visualizzare i messaggi pubblicati:
gcloud pubsub subscriptions pull --auto-ack change-subscriber
Dovresti vedere una riga di tabella contenente il messaggio e altri campi:
{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}
Spiegazione
- Lo script Python simula le modifiche al database generando in modo casuale eventi
INSERT
,UPDATE
oDELETE
. - Ogni modifica è rappresentata come un oggetto JSON contenente il tipo di modifica, l'ID record, il timestamp e i dati.
- Lo script utilizza la libreria client Cloud Pub/Sub per pubblicare questi eventi di modifica nell'argomento
database-changes
. - Il comando subscriber consente di visualizzare i messaggi inviati all'argomento Pub/Sub.
4. Creare un account di servizio per Dataproc
In questa sezione crei un account di servizio che il cluster Dataproc può utilizzare. Assegna inoltre le autorizzazioni necessarie per consentire alle istanze del cluster di accedere a Cloud Pub/Sub e Dataproc.
- Crea un account di servizio:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- Aggiungi il ruolo del nodo di lavoro Dataproc per consentire all'account di servizio di creare cluster ed eseguire job. Aggiungi l'ID account di servizio generato nel comando precedente come membro nel comando seguente:
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- Aggiungi il ruolo di sottoscrittore Pub/Sub per consentire all'account di servizio di iscriversi all'iscrizione Pub/Sub "change-subscriber":
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
5. Crea un cluster Dataproc
Il cluster Dataproc eseguirà l'app Spark che elaborerà i messaggi in Pub/Sub. Devi avere l'account di servizio creato nella sezione precedente. Dataproc assegna questo account di servizio a ogni istanza del cluster in modo che tutte le istanze ricevano le autorizzazioni corrette per eseguire l'app.
Utilizza il seguente comando per creare un cluster Dataproc:
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
6. Invia il job Spark al cluster Dataproc
L'app di streaming Spark elabora i messaggi di modifica del database in Pub/Sub e li stampa nella console.
Passi
- Crea una directory e aggiungi il codice sorgente del consumatore al file PubsubConsumer.scala
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- Crea e aggiungi quanto segue a pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- Passa alla directory Spark del progetto e salva il percorso in una variabile di ambiente da utilizzare in seguito:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- Cambia la directory:
cd $REPO_ROOT/spark
- Scarica Java 1.8 e posiziona la cartella in /usr/lib/jvm/. Quindi, modifica JAVA_HOME in modo che punti a quanto segue:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- Crea il file jar dell'applicazione
mvn clean package
L'archivio spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar contenente il codice dell'applicazione e le dipendenze viene creato nella directory spark/target
- Invia la richiesta di Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- Visualizza l'elenco dei job attivi e prendi nota del valore
JOB_ID
per il job:
gcloud dataproc jobs list --region=us-central1 --state-filter=active
L'output sarà simile a
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- Visualizza l'output del job aprendo il seguente URL nel browser. Sostituisci [JOB_ID] con il valore annotato nel passaggio precedente.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- L'output è simile al seguente:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
Il job di streaming Spark in esecuzione in Dataproc estrae i messaggi da Pub/Sub, li elabora e mostra l'output nella console.
- Terminazione del job: esegui il seguente comando per terminare il job. Sostituisci JOB_ID con lo stesso che abbiamo annotato in precedenza
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
Complimenti! Hai appena creato una potente pipeline CDC che acquisisce le modifiche al database in Pub/Sub ed elabora utilizzando Spark Streaming in esecuzione in Cloud Dataproc.
7. Esegui la pulizia
Elimina le risorse che hai creato in modo che non ti vengano addebitate in futuro. Il modo più semplice per eliminare la fatturazione è eliminare il progetto creato per il tutorial. In alternativa, puoi eliminare le singole risorse.
Esegui i seguenti comandi per eliminare le singole risorse
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
8. Complimenti
Congratulazioni, hai appena completato un codelab pratico che mostra come creare una solida pipeline di dati in tempo reale utilizzando la piattaforma Google Cloud. Ricapitoliamo cosa hai realizzato:
- Change Data Capture (CDC) simulato: hai appreso i fondamenti della CDC e hai implementato uno script Python per simulare le modifiche al database, generando eventi che rappresentano le modifiche dei dati in tempo reale.
- Cloud Pub/Sub sfruttato:configuri argomenti e abbonamenti Cloud Pub/Sub, fornendo un servizio di messaggistica scalabile e affidabile per lo streaming degli eventi CDC. Hai pubblicato le modifiche simulate del database in Pub/Sub, creando uno stream di dati in tempo reale.
- Dati elaborati con Dataproc e Spark:hai eseguito il provisioning di un cluster Dataproc e hai dispiegato un job Spark Streaming per consumare i messaggi dal tuo abbonamento Pub/Sub. Hai elaborato e trasformato gli eventi CDC in entrata in tempo reale, visualizzando i risultati nei log dei job Dataproc.
- Hai creato una pipeline in tempo reale end-to-end:hai integrato correttamente questi servizi per creare una pipeline di dati completa che acquisisce, acquisisce in streaming ed elabora le modifiche dei dati in tempo reale. Hai acquisito esperienza pratica nella creazione di un sistema in grado di gestire flussi di dati continui.
- Hai utilizzato il connettore Spark Pub/Sub:hai configurato correttamente un cluster Dataproc per utilizzare il connettore Spark Pub/Sub, che è fondamentale per consentire a Spark Structured Streaming di leggere i dati da Pub/Sub.
Ora hai una base solida per creare pipeline di dati più complesse e sofisticate per varie applicazioni, tra cui analisi in tempo reale, data warehousing e architetture di microservizi. Continua a esplorare e a creare.