1. Introducción
Última actualización: 19/6/2025
¿Qué es la captura de datos modificados?
La captura de datos modificados (CDC) es un conjunto de patrones de diseño de software que se usan para determinar y hacer un seguimiento de los datos que cambiaron en una base de datos. En términos más simples, es una forma de capturar y registrar los cambios realizados en los datos para que se puedan replicar en otros sistemas.
La captura de datos modificados (CDC) es increíblemente útil en una amplia variedad de situaciones basadas en datos, como la migración de datos, el análisis y el almacenamiento de datos en tiempo real, la recuperación ante desastres y la alta disponibilidad, la auditoría y el cumplimiento, etcétera.
Migración de datos
El CDC simplifica los proyectos de migración de datos, ya que permite la transferencia de datos incremental, reduce el tiempo de inactividad y minimiza las interrupciones.
Almacén de datos y estadísticas en tiempo real
La CDC garantiza que los almacenes de datos y los sistemas de análisis se actualicen constantemente con los cambios más recientes de las bases de datos operativas.
Esto permite a las empresas tomar decisiones basadas en información en tiempo real.
Recuperación ante desastres y alta disponibilidad
La CDC permite la replicación de datos en tiempo real en bases de datos secundarias para la recuperación ante desastres. En caso de falla, la CDC permite una conmutación por error rápida a una base de datos secundaria, lo que minimiza el tiempo de inactividad y la pérdida de datos.
Auditoría y cumplimiento
El CDC proporciona un registro de auditoría detallado de los cambios en los datos, lo que es esencial para el cumplimiento de los requisitos regulatorios.
Qué compilarás
En este codelab, compilarás una canalización de datos de captura de datos modificados (CDC) con Cloud Pub/Sub, Dataproc, Python y Apache Spark. Tu canalización hará lo siguiente:
- Simula los cambios en la base de datos y publícalos como eventos en Cloud Pub/Sub, un servicio de mensajería escalable y confiable.
- Aprovecha la potencia de Dataproc, el servicio administrado de Spark y Hadoop de Google Cloud, para procesar estos eventos en tiempo real.
Si conectas estos servicios, crearás una canalización sólida capaz de capturar y procesar los cambios de datos a medida que ocurren, lo que proporciona una base para el análisis en tiempo real, el almacenamiento de datos y otras aplicaciones fundamentales.
Qué aprenderás
- Cómo crear una canalización básica de captura de datos modificados
- Dataproc para el procesamiento de transmisión
- Cloud Pub/Sub para mensajes en tiempo real
- Conceptos básicos de Apache Spark
Este codelab se enfoca en Dataproc y Cloud Pub/Sub. Los conceptos y los bloques de código no relacionados con esos temas se abordan superficialmente y se proporcionan para que simplemente los copies y pegues.
Requisitos
- Una cuenta de GCP activa con un proyecto configurado Si no tienes una, puedes registrarte para obtener una prueba gratuita.
- gcloud CLI instalada y configurada
- Python 3.7 o versiones posteriores instalado para simular cambios en la base de datos y para interactuar con Pub/Sub
- Conocimientos básicos de Dataproc, Cloud Pub/Sub, Apache Spark y Python
Antes de comenzar
Ejecuta el siguiente comando en la terminal para habilitar las APIs requeridas:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
2. Configura Cloud Pub/Sub
Crea un tema
Este tema se usará para publicar los cambios de la base de datos. El trabajo de Dataproc será el consumidor de estos mensajes y procesará los mensajes para la captura de datos de cambios. Si quieres obtener más información sobre los temas, puedes leer la documentación oficial aquí.
gcloud pubsub topics create database-changes
Crea una suscripción
Crea una suscripción que se usará para consumir los mensajes de Pub/Sub. Para obtener más información sobre las suscripciones, puedes leer la documentación oficial aquí.
gcloud pubsub subscriptions create --topic database-changes change-subscriber
3. Simula cambios en la base de datos
Pasos
- Crea una secuencia de comandos de Python (p.ej.,
simulate_cdc.py
) para simular cambios en la base de datos y publicarlos en Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
Reemplaza YOUR_PROJECT_ID por el ID real de tu proyecto de GCP.
- Instala la biblioteca cliente de Pub/Sub:
pip install google-cloud-pubsub
- Ejecuta la secuencia de comandos en la terminal. Esta secuencia de comandos se ejecutará de forma continua y publicará mensajes cada 2 segundos en el tema de Pub/Sub.
python simulate_cdc.py
- Después de ejecutar la secuencia de comandos durante, digamos, 1 minuto, tendrás suficientes mensajes en Pub/Sub para consumir. Para finalizar la secuencia de comandos de Python en ejecución, presiona Ctrl + C o Cmd + C, según el SO.
- Ver mensajes publicados:
Abre otra terminal y ejecuta el siguiente comando para ver los mensajes publicados:
gcloud pubsub subscriptions pull --auto-ack change-subscriber
Deberías ver una fila de la tabla que contiene el mensaje y otros campos:
{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}
Explicación
- La secuencia de comandos de Python simula los cambios en la base de datos generando aleatoriamente eventos
INSERT
,UPDATE
oDELETE
. - Cada cambio se representa como un objeto JSON que contiene el tipo de cambio, el ID del registro, la marca de tiempo y los datos.
- La secuencia de comandos usa la biblioteca cliente de Cloud Pub/Sub para publicar estos eventos de cambio en el tema
database-changes
. - El comando subscriber te permite ver los mensajes que se envían al tema de Pub/Sub.
4. Crea una cuenta de servicio para Dataproc
En esta sección, crearás una cuenta de servicio que el clúster de Dataproc puede usar. También puedes asignar los permisos necesarios para permitir que las instancias de clústeres accedan a Cloud Pub/Sub y Dataproc.
- Crear una cuenta de servicio:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- Agrega el rol de trabajador de Dataproc para permitir que la cuenta de servicio cree clústeres y ejecute trabajos. Agrega el ID de la cuenta de servicio generado en el comando anterior como miembro en el siguiente comando:
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- Agrega el rol de suscriptor de Pub/Sub para permitir que la cuenta de servicio se suscriba a la suscripción de Pub/Sub "change-subscriber":
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
5. Cree un clúster de Dataproc
El clúster de Dataproc ejecutará la app de Spark, que procesará los mensajes en Pub/Sub. Necesitarás la cuenta de servicio que creaste en la sección anterior. Dataproc asigna esta cuenta de servicio a todas las instancias del clúster para que todas estas obtengan los permisos correctos a fin de ejecutar la app.
Usa el siguiente comando para crear un clúster de Dataproc:
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
6. Envía el trabajo de Spark al clúster de Dataproc
La app de transmisión de Spark procesa los mensajes de cambio de la base de datos en Pub/Sub y los imprime en la consola.
Pasos
- Crea un directorio y agrega el código fuente del consumidor al archivo PubsubConsumer.scala.
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- Crea lo siguiente y agrégalo a pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- Cambia al directorio spark del proyecto y guarda la ruta en una variable de entorno para usarla más adelante:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- Cambia el directorio:
cd $REPO_ROOT/spark
- Descarga Java 1.8 y coloca la carpeta en /usr/lib/jvm/. Luego, cambia JAVA_HOME para que apunte a lo siguiente:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- Compila el archivo jar de la aplicación
mvn clean package
El archivo spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar que contiene el código de la aplicación y las dependencias se crea en el directorio spark/target
.
- Envía la solicitud de Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- Muestra la lista de los trabajos activos y anota el valor
JOB_ID
para el trabajo:
gcloud dataproc jobs list --region=us-central1 --state-filter=active
El resultado se verá similar a lo siguiente:
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- La salida del trabajo se puede ver cuando abres la siguiente URL en tu navegador. Reemplaza [JOB_ID] por el valor que anotaste en el paso anterior.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- El resultado es similar a este:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
El trabajo de transmisión de Spark que se ejecuta en Dataproc extrae mensajes de Pub/Sub, los procesa y muestra el resultado en la consola.
- Finalización del trabajo: Ejecuta el siguiente comando para finalizar el trabajo. Reemplaza JOB_ID por el mismo que anotamos antes.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
¡Felicitaciones! Acabas de crear una canalización de CDC potente que captura los cambios de la base de datos en Pub/Sub y los procesa con la transmisión de Spark que se ejecuta en Cloud Dataproc.
7. Limpia
Limpia todos los recursos que creaste para que no se facturen en el futuro. La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo. También puedes borrar recursos individuales.
Ejecuta los siguientes comandos para borrar recursos individuales
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
8. Felicitaciones
¡Felicitaciones! Acabas de completar un codelab práctico en el que se muestra cómo compilar una canalización de datos en tiempo real sólida con Google Cloud Platform. Repasemos lo que lograste:
- Captura de datos modificados (CDC) simulada: Aprendiste los conceptos básicos de la CDC y, luego, implementaste una secuencia de comandos de Python para simular cambios en la base de datos y generar eventos que representen modificaciones de datos en tiempo real.
- Aprovecha Cloud Pub/Sub: Configura temas y suscripciones de Cloud Pub/Sub para proporcionar un servicio de mensajería escalable y confiable para transmitir tus eventos de CDC. Publicaste los cambios simulados de la base de datos en Pub/Sub, lo que creó un flujo de datos en tiempo real.
- Datos procesados con Dataproc y Spark: Aprovisionaste un clúster de Dataproc y, luego, implementaste un trabajo de transmisión de Spark para consumir mensajes de tu suscripción a Pub/Sub. Procesaste y transformaste los eventos de CDC entrantes en tiempo real y mostraste los resultados en los registros de trabajo de Dataproc.
- Creaste una canalización de extremo a extremo en tiempo real: Integraste correctamente estos servicios para crear una canalización de datos completa que captura, transmite y procesa los cambios de datos en tiempo real. Adquiriste experiencia práctica en la compilación de un sistema que puede controlar flujos de datos continuos.
- Usaste el conector de Pub/Sub de Spark: Configuraste correctamente un clúster de Dataproc para usar el conector de Pub/Sub de Spark, que es fundamental para que la transmisión estructurada de Spark lea datos de Pub/Sub.
Ahora tienes una base sólida para crear canalizaciones de datos más complejas y sofisticadas para varias aplicaciones, incluidos los análisis en tiempo real, el almacenamiento de datos y las arquitecturas de microservicios. Sigue explorando y creando.