Como criar a captura de dados de alteração usando o Dataproc e o Cloud Pub/Sub

1. Introdução

df8070bd84336207.png

Última atualização:19/06/2025

O que é a captura de dados alterados?

A captura de dados de alterações (CDC) é um conjunto de padrões de design de software usados para determinar e rastrear dados que foram alterados em um banco de dados. Em termos mais simples, é uma maneira de capturar e registrar as mudanças feitas nos dados para que elas possam ser replicadas em outros sistemas.

A captura de dados alterados (CDC) é extremamente útil em uma ampla variedade de cenários orientados a dados, como migração de dados, data warehouse e análises em tempo real, recuperação de desastres e alta disponibilidade, auditoria e compliance etc.

Migração de dados

O CDC simplifica os projetos de migração de dados, permitindo a transferência incremental de dados, reduzindo o tempo de inatividade e minimizando a interrupção.

Armazenamento de dados e análises em tempo real

A CDC garante que os data warehouses e sistemas analíticos sejam atualizados constantemente com as mudanças mais recentes dos bancos de dados operacionais.

Isso permite que as empresas tomem decisões com base em informações em tempo real.

Recuperação de desastres e alta disponibilidade

A CDC permite a replicação de dados em tempo real para bancos de dados secundários para fins de recuperação de desastres. Em caso de falha, o CDC permite o failover rápido para um banco de dados secundário, minimizando o tempo de inatividade e a perda de dados.

Auditoria e conformidade

O CDC fornece uma trilha de auditoria detalhada das mudanças de dados, o que é essencial para a conformidade e os requisitos regulatórios.

O que você vai criar

Neste codelab, você vai criar um pipeline de dados de captura de dados alterados (CDC) usando o Cloud Pub/Sub, o Dataproc, o Python e o Apache Spark. O pipeline vai:

  • Simule mudanças no banco de dados e publique-as como eventos no Cloud Pub/Sub, um serviço de mensagens escalonável e confiável.
  • Use o Dataproc, o serviço gerenciado do Spark e do Hadoop do Google Cloud, para processar esses eventos em tempo real.

Ao conectar esses serviços, você cria um pipeline robusto capaz de capturar e processar mudanças de dados à medida que elas ocorrem, fornecendo uma base para análises em tempo real, armazenamento de dados e outras aplicações importantes.

O que você vai aprender

  • Como criar um pipeline básico de captura de dados de alteração
  • Dataproc para processamento de stream
  • Cloud Pub/Sub para mensagens em tempo real
  • Noções básicas do Apache Spark

Este codelab se concentra no Dataproc e no Cloud Pub/Sub. Conceitos e blocos de códigos sem relevância não serão abordados. Eles são incluídos somente para você copiar e colar.

O que é necessário

  • uma conta ativa do GCP com um projeto configurado. Se você não tiver uma conta, inscreva-se para fazer um teste sem custo financeiro.
  • CLI gcloud instalada e configurada.
  • Python 3.7 ou versão mais recente instalado para simular mudanças no banco de dados e interagir com o Pub/Sub.
  • Conhecimento básico do Dataproc, Cloud Pub/Sub, Apache Spark e Python.

Antes de começar

Execute o seguinte comando no terminal para ativar as APIs necessárias:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. Configurar o Cloud Pub/Sub

Criar um tópico

Esse tópico será usado para publicar as mudanças no banco de dados. O job do Dataproc será o consumidor dessas mensagens e as processará para a captura de dados de mudança. Para saber mais sobre os tópicos, leia a documentação oficial aqui.

gcloud pubsub topics create database-changes

Criar uma assinatura

Crie uma assinatura que será usada para consumir as mensagens no Pub/Sub. Para saber mais sobre assinaturas, leia a documentação oficial neste link.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. Simular mudanças no banco de dados

Etapas

  1. Crie um script Python (por exemplo, simulate_cdc.py) para simular mudanças no banco de dados e publicá-las no Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

Substitua YOUR_PROJECT_ID pelo ID do seu projeto do GCP.

  1. Instale a biblioteca de cliente do Pub/Sub:
pip install google-cloud-pubsub
  1. Execute o script no terminal. Esse script será executado continuamente e vai publicar mensagens a cada dois segundos no tópico do Pub/Sub.
python simulate_cdc.py
  1. Depois de executar o script por um minuto, você terá mensagens suficientes no Pub/Sub para consumir. É possível encerrar o script Python em execução pressionando Ctrl + C ou Cmd + C, dependendo do SO.
  2. Conferir as mensagens publicadas:

Abra outro terminal e execute o seguinte comando para conferir as mensagens publicadas:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

Você vai encontrar uma linha de tabela com a mensagem e outros campos:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

Explicação

  • O script Python simula mudanças no banco de dados gerando aleatoriamente eventos INSERT, UPDATE ou DELETE.
  • Cada mudança é representada como um objeto JSON contendo o tipo de mudança, o ID do registro, o carimbo de data/hora e os dados.
  • O script usa a biblioteca de cliente do Cloud Pub/Sub para publicar esses eventos de mudança no tópico database-changes.
  • O comando de inscrição permite que você acesse as mensagens que estão sendo enviadas para o tópico do Pub/Sub.

4. Criar uma conta de serviço para o Dataproc

Nesta seção, você vai criar uma conta de serviço que o cluster do Dataproc pode usar. Você também atribui as permissões necessárias para permitir que as instâncias do cluster acessem o Cloud Pub/Sub e o Dataproc.

  1. Crie uma conta de serviço:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Adicione o papel de worker do Dataproc para permitir que a conta de serviço crie clusters e execute jobs. Adicione o ID da conta de serviço gerado no comando anterior como membro no comando abaixo:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Adicione o papel de assinante do Pub/Sub para permitir que a conta de serviço assine a assinatura "change-subscriber" do Pub/Sub:
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. crie um cluster do Dataproc

O cluster do Dataproc vai executar o app Spark, que processará as mensagens no Pub/Sub. Você vai precisar da conta de serviço criada na seção anterior. O Dataproc atribui essa conta de serviço a todas as instâncias do cluster para que todas as instâncias recebam as permissões corretas para executar o app.

Use o comando abaixo para criar um cluster do Dataproc:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Enviar o job do Spark para o cluster do Dataproc

O app de streaming Spark processa as mensagens de mudança do banco de dados no Pub/Sub e as imprime no console.

Etapas

  1. Crie um diretório e adicione o código-fonte do consumidor ao arquivo PubsubConsumer.scala.
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. Crie e adicione o seguinte ao pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. Mude para o diretório do projeto e salve o caminho em uma variável de ambiente para uso posterior:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. Mude o diretório:
cd $REPO_ROOT/spark
  1. Faça o download do Java 1.8 e coloque a pasta em /usr/lib/jvm/. Em seguida, mude o JAVA_HOME para apontar para este:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. Criar o jar do aplicativo
mvn clean package

O arquivo spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar que contém o código do aplicativo e as dependências é criado no diretório spark/target.

  1. Envie o aplicativo Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. Exiba a lista de jobs ativos e observe o valor de JOB_ID para o trabalho:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

A saída será semelhante a

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. Veja o resultado do job abrindo o URL a seguir no seu navegador. Substitua [JOB_ID] pelo valor anotado na etapa anterior.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. O resultado será assim:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

O job de streaming do Spark em execução no Dataproc extrai mensagens do Pub/Sub, as processa e exibe a saída no console.

  1. Encerrar o job: execute o comando abaixo para encerrar o job. Substitua JOB_ID pelo mesmo que você anotou anteriormente
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

Parabéns! Você acabou de criar um pipeline de CDC poderoso que captura as alterações do banco de dados no Pub/Sub e as processa usando o streaming do Spark executado no Cloud Dataproc.

7. Limpar

Limpe todos os recursos que você criou para não ser cobrado por eles no futuro. O jeito mais fácil de evitar o faturamento é excluindo o projeto criado para este tutorial. Como alternativa, exclua os recursos individuais.

Execute os comandos abaixo para excluir recursos individuais.

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. Parabéns

Parabéns! Você acabou de concluir um codelab prático que demonstra como criar um pipeline de dados robusto em tempo real usando o Google Cloud Platform. Vamos recapitular o que você fez:

  • Captura de dados de alteração simulada (CDC): você aprendeu os fundamentos do CDC e implementou um script Python para simular mudanças no banco de dados, gerando eventos que representam modificações de dados em tempo real.
  • Cloud Pub/Sub otimizado:você configura tópicos e assinaturas do Cloud Pub/Sub, fornecendo um serviço de mensagens escalonável e confiável para transmitir seus eventos de CDC. Você publicou as mudanças simuladas no banco de dados no Pub/Sub, criando um fluxo de dados em tempo real.
  • Dados processados com o Dataproc e o Spark:você provisionou um cluster do Dataproc e implantou um job do Spark Streaming para consumir mensagens da sua assinatura do Pub/Sub. Você processou e transformou os eventos de CDC recebidos em tempo real, mostrando os resultados nos registros de jobs do Dataproc.
  • Criação de um pipeline completo em tempo real:você integrou esses serviços para criar um pipeline de dados completo que captura, transmite e processa mudanças de dados em tempo real. Você ganhou experiência prática na criação de um sistema que pode processar streams de dados contínuos.
  • Usou o conector do Pub/Sub do Spark:você configurou um cluster do Dataproc para usar o conector do Pub/Sub do Spark, que é essencial para que o Streaming estruturado do Spark leia dados do Pub/Sub.

Agora você tem uma base sólida para criar pipelines de dados mais complexos e sofisticados para vários aplicativos, incluindo análises em tempo real, data warehouse e arquiteturas de microsserviços. Continue explorando e criando.

Documentos de referência