Cómo compilar la Captura de datos modificados con Dataproc y Cloud Pub/Sub

1. Introducción

df8070bd84336207.png

Última actualización: 19/6/2025

¿Qué es la captura de datos modificados?

La captura de datos modificados (CDC) es un conjunto de patrones de diseño de software que se usan para determinar y hacer un seguimiento de los datos que cambiaron en una base de datos. En términos más simples, es una forma de capturar y registrar los cambios realizados en los datos para que esos cambios se puedan replicar en otros sistemas.

La captura de datos modificados (CDC) es increíblemente útil en una amplia variedad de situaciones basadas en datos, como la migración de datos, el almacenamiento y el análisis de datos en tiempo real, la recuperación ante desastres y la alta disponibilidad, la auditoría y el cumplimiento, etc.

Migración de datos

La CDC simplifica los proyectos de migración de datos, ya que permite la transferencia incremental de datos, lo que reduce el tiempo de inactividad y minimiza las interrupciones.

Almacenamiento y análisis de datos en tiempo real

La CDC garantiza que los almacenes de datos y los sistemas analíticos se actualicen constantemente con los cambios más recientes de las bases de datos operativas.

Esto permite que las empresas tomen decisiones basadas en información en tiempo real.

Recuperación ante desastres y alta disponibilidad

La CDC permite la replicación de datos en tiempo real en bases de datos secundarias para fines de recuperación ante desastres. En caso de falla, la CDC permite una conmutación por error rápida a una base de datos secundaria, lo que minimiza el tiempo de inactividad y la pérdida de datos.

Auditoría y cumplimiento

La CDC proporciona un registro de auditoría detallado de los cambios en los datos, lo que es esencial para el cumplimiento y los requisitos reglamentarios.

Qué compilarás

En este codelab, compilarás una canalización de datos de captura de datos de cambio (CDC) con Cloud Pub/Sub, Dataproc, Python y Apache Spark. Tu canalización hará lo siguiente:

  • Simular cambios en la base de datos y publicarlos como eventos en Cloud Pub/Sub, un servicio de mensajería escalable y confiable
  • Aprovecha la potencia de Dataproc, el servicio administrado de Spark y Hadoop de Google Cloud, para procesar estos eventos en tiempo real.

Si conectas estos servicios, crearás una canalización sólida capaz de capturar y procesar los cambios de datos a medida que ocurren, lo que proporcionará una base para el análisis en tiempo real, el almacenamiento de datos y otras aplicaciones críticas.

Qué aprenderás

  • Cómo crear una canalización básica de captura de datos modificados
  • Dataproc para el procesamiento de transmisiones
  • Cloud Pub/Sub para la mensajería en tiempo real
  • Conceptos básicos de Apache Spark

Este codelab se enfoca en Dataproc y Cloud Pub/Sub. Los conceptos y los bloques de código no relacionados con esos temas se abordan superficialmente y se proporcionan para que simplemente los copies y pegues.

Requisitos

  • Una cuenta de GCP activa con un proyecto configurado Si no tienes una, puedes registrarte para obtener una prueba gratuita.
  • La CLI de gcloud está instalada y configurada.
  • Python 3.7 o versiones posteriores instalado para simular cambios en la base de datos y para interactuar con Pub/Sub
  • Conocimientos básicos de Dataproc, Cloud Pub/Sub, Apache Spark y Python

Antes de comenzar

Ejecuta el siguiente comando en la terminal para habilitar las APIs requeridas:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. Configura Cloud Pub/Sub

Crea un tema

Este tema se usará para publicar los cambios en la base de datos. El trabajo de Dataproc será el consumidor de estos mensajes y los procesará para la captura de datos de cambio. Si quieres obtener más información sobre los temas, puedes leer la documentación oficial aquí.

gcloud pubsub topics create database-changes

Crear una suscripción

Crea una suscripción que se usará para consumir los mensajes en Pub/Sub. Para obtener más información sobre las suscripciones, puedes leer la documentación oficial aquí.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. Simula cambios en la base de datos

Pasos

  1. Crea una secuencia de comandos de Python (p.ej., simulate_cdc.py) para simular cambios en la base de datos y publicarlos en Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

Reemplaza YOUR_PROJECT_ID por el ID de tu proyecto de GCP real.

  1. Instala la biblioteca cliente de Pub/Sub:
pip install google-cloud-pubsub
  1. Ejecuta la secuencia de comandos en tu terminal. Esta secuencia de comandos se ejecutará de forma continua y publicará mensajes cada 2 segundos en el tema de Pub/Sub.
python simulate_cdc.py
  1. Después de ejecutar la secuencia de comandos durante 1 minuto, por ejemplo, tendrás suficientes mensajes en Pub/Sub para consumir. Para detener el script de Python en ejecución, presiona Ctrl + C o Cmd + C, según tu SO.
  2. Ver mensajes publicados:

Abre otra terminal y ejecuta el siguiente comando para ver los mensajes publicados:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

Deberías ver una fila de la tabla que contiene el mensaje y otros campos:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

Explicación

  • La secuencia de comandos de Python simula los cambios en la base de datos generando de forma aleatoria eventos INSERT, UPDATE o DELETE.
  • Cada cambio se representa como un objeto JSON que contiene el tipo de cambio, el ID del registro, la marca de tiempo y los datos.
  • La secuencia de comandos usa la biblioteca cliente de Cloud Pub/Sub para publicar estos eventos de cambio en el tema database-changes.
  • El comando de suscriptor te permite ver los mensajes que se envían al tema de Pub/Sub.

4. Crea una cuenta de servicio para Dataproc

En esta sección, crearás una cuenta de servicio que el clúster de Dataproc puede usar. También puedes asignar los permisos necesarios para permitir que las instancias de clústeres accedan a Cloud Pub/Sub y Dataproc.

  1. Crea una cuenta de servicio:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Agrega el rol de trabajador de Dataproc para permitir que la cuenta de servicio cree clústeres y ejecute trabajos. Agrega el ID de la cuenta de servicio que se generó en el comando anterior como miembro en el siguiente comando:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Agrega el rol de suscriptor de Pub/Sub para permitir que la cuenta de servicio se suscriba a la suscripción de Pub/Sub "change-subscriber":
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. Cree un clúster de Dataproc

El clúster de Dataproc ejecutará la app de Spark, que procesará los mensajes en Pub/Sub. Necesitarás la cuenta de servicio que creaste en la sección anterior. Dataproc asigna esta cuenta de servicio a todas las instancias del clúster para que todas estas obtengan los permisos correctos a fin de ejecutar la app.

Usa el siguiente comando para crear un clúster de Dataproc:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Envía el trabajo de Spark al clúster de Dataproc

La app de transmisión de Spark procesa los mensajes de cambio de la base de datos en Pub/Sub y los imprime en la consola.

Pasos

  1. Crea un directorio y agrega el código fuente del consumidor al archivo PubsubConsumer.scala
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. Crea y agrega lo siguiente a pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. Cambia al directorio spark del proyecto y guarda la ruta de acceso en una variable de entorno para usarla más adelante:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. Cambia el directorio:
cd $REPO_ROOT/spark
  1. Descarga Java 1.8 y coloca la carpeta en /usr/lib/jvm/. Luego, cambia JAVA_HOME para que apunte a lo siguiente:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. Compila el archivo JAR de la aplicación
mvn clean package

El archivo spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar que contiene el código de la aplicación y las dependencias se crea en el directorio spark/target.

  1. Envía la aplicación de Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. Muestra la lista de los trabajos activos y anota el valor JOB_ID para el trabajo:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

El resultado será similar al siguiente:

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. La salida del trabajo se puede ver cuando abres la siguiente URL en tu navegador. Reemplaza [JOB_ID] por el valor que anotaste en el paso anterior.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. El resultado es similar a este:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

El trabajo de transmisión de Spark que se ejecuta en Dataproc extrae mensajes de Pub/Sub, los procesa y muestra el resultado en la consola.

  1. Finaliza el trabajo: Ejecuta el siguiente comando para finalizar el trabajo. Reemplaza JOB_ID por el mismo que anotamos antes.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

¡Felicitaciones! Acabas de crear una potente canalización de CDC que captura los cambios de la base de datos en Pub/Sub y los procesa con Spark Streaming que se ejecuta en Cloud Dataproc.

7. Limpia

Limpia todos los recursos que creaste para evitar que se apliquen cargos en el futuro. La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo. También puedes borrar recursos individuales.

Ejecuta los siguientes comandos para borrar recursos individuales.

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. Felicitaciones

¡Felicitaciones! Acabas de completar un codelab práctico que muestra cómo compilar una canalización de datos sólida en tiempo real con Google Cloud Platform. Repasemos lo que lograste:

  • Captura de datos modificados (CDC) simulada: Aprendiste los conceptos básicos de la CDC y, luego, implementaste una secuencia de comandos de Python para simular cambios en la base de datos, lo que generó eventos que representan modificaciones de datos en tiempo real.
  • Cloud Pub/Sub aprovechado: Configuras temas y suscripciones de Cloud Pub/Sub, lo que proporciona un servicio de mensajería escalable y confiable para transmitir tus eventos de CDC. Publicaste los cambios simulados de la base de datos en Pub/Sub, lo que creó un flujo de datos en tiempo real.
  • Datos procesados con Dataproc y Spark: Aprovisionaste un clúster de Dataproc y, luego, implementaste un trabajo de Spark Streaming para consumir mensajes de tu suscripción a Pub/Sub. Procesaste y transformaste los eventos de CDC entrantes en tiempo real, y mostraste los resultados en los registros de trabajos de Dataproc.
  • Creaste una canalización en tiempo real de extremo a extremo: Integraste correctamente estos servicios para crear una canalización de datos completa que captura, transmite y procesa los cambios de datos en tiempo real. Adquiriste experiencia práctica en la creación de un sistema que puede controlar transmisiones de datos continuas.
  • Usaste el conector de Spark Pub/Sub: Configuraste correctamente un clúster de Dataproc para usar el conector de Spark Pub/Sub, que es fundamental para que Spark Structured Streaming lea datos de Pub/Sub.

Ahora tienes una base sólida para crear canalizaciones de datos más complejas y sofisticadas para diversas aplicaciones, como análisis en tiempo real, almacenamiento de datos y arquitecturas de microservicios. ¡Sigue explorando y creando!

Documentos de referencia