Building Change Data Capture using Dataproc and Cloud Pub/Sub

1. Introduction

df8070bd84336207.png

Last Updated: 2025-06-19

What is Change Data Capture?

Change Data Capture (CDC) is a set of software design patterns used to determine and track data that has changed in a database. In simpler terms, it's a way of capturing and recording changes made to data so that those changes can be replicated to other systems.

Change Data Capture (CDC) is incredibly useful across a wide range of data-driven scenarios like Data Migration, Real-time Data Warehousing and Analytics, Disaster Recovery and High Availability, Audit and Compliance etc.

Data Migration

CDC simplifies data migration projects by allowing for incremental data transfer, reducing downtime and minimizing disruption.

Real-time Data Warehousing and Analytics

CDC ensures that data warehouses and analytical systems are constantly updated with the latest changes from operational databases.

This allows businesses to make decisions based on real-time information.

Disaster Recovery and High Availability

CDC enables real-time replication of data to secondary databases for disaster recovery purposes. In the event of a failure, CDC allows for quick failover to a secondary database, minimizing downtime and data loss.

Audit and Compliance

CDC provides a detailed audit trail of data changes, which is essential for compliance and regulatory requirements.

What you'll build

In this codelab, you're going to build a change-data-capture (CDC) data pipeline using using Cloud Pub/Sub, Dataproc, Python and Apache Spark. Your pipeline will:

  • Simulate database changes and publish them as events to Cloud Pub/Sub, a scalable and reliable messaging service.
  • Leverage the power of Dataproc, Google Cloud's managed Spark and Hadoop service, to process these events in real-time.

By connecting these services, you'll create a robust pipeline capable of capturing and processing data changes as they occur, providing a foundation for real-time analytics, data warehousing, and other critical applications.

What you'll learn

This codelab is focused on Dataproc and Cloud Pub/Sub. Non-relevant concepts and code blocks are glossed over and are provided for you to simply copy and paste.

What you'll need

  • an active GCP account with a project set up. If you don't have one, you can sign up for a free trial.
  • gcloud CLI installed and configured.
  • Python 3.7+ installed for simulating database changes and interacting with Pub/Sub.
  • Basic Knowledge of Dataproc, Cloud Pub/Sub, Apache Spark and Python.

Before you start

Execute the following command in the terminal to enable the required APIs:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. Set up Cloud Pub/Sub

Create a Topic

This topic will be used to publish the database changes. Dataproc job will be the consumer of these messages and will process the messages for change data capture. If you want to know more about topics, you can read the official documentation here.

gcloud pubsub topics create database-changes

Create a Subscription

Create a subscription which will be used to consume the messages in the Pub/Sub. To know more about subscriptions, you can read the official documentation here.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. Simulate Database Changes

Steps

  1. Create a Python script (e.g., simulate_cdc.py) to simulate database changes and publish them to Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

Replace YOUR_PROJECT_ID with your actual GCP project ID

  1. Install the Pub/Sub client library:
pip install google-cloud-pubsub
  1. Run the script on your terminal. This script will run continuously and publish messages every 2 seconds to the Pub/Sub topic.
python simulate_cdc.py
  1. After running the script for let's say 1 minute, you will have enough messages in Pub/Sub to consume. You can terminate the running python script by pressing ctrl + C or Cmd + C, depending on your OS.
  2. View Published Messages:

Open another terminal and run the following command to view the published messages:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

You should see a table row containing the message and other fields:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

Explanation

  • The Python script simulates database changes by randomly generating INSERT, UPDATE, or DELETE events.
  • Each change is represented as a JSON object containing the change type, record ID, timestamp, and data.
  • The script uses the Cloud Pub/Sub client library to publish these change events to the database-changes topic.
  • The subscriber command allows you to view the messages that are being sent to the pub/sub topic.

4. Create a service account for Dataproc

In this section, you create a Service account that the Dataproc cluster can use. You also assign the necessary permissions to allow the cluster instances to access Cloud Pub/sub and Dataproc.

  1. Create a service account:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Add the Dataproc worker role to allow the service account to create clusters and run jobs. Add the service account id generated in the previous command as the member in the below command:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Add the Pub/sub subscriber role to allow the service account to subscribe to the "change-subscriber" Pub/sub subscription:
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. Create a Dataproc Cluster

The Dataproc cluster will run the spark app which will process the messages in the Pub/sub. You would need the service account created in the previous section. Dataproc assigns this service account to every instance in the cluster so that all the instances get the correct permissions to run the app.

Use the following command to create a Dataproc cluster:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Submit the Spark Job to the Dataproc Cluster

The spark streaming app processes the database change messages in Pub/sub and prints them to the console.

Steps

  1. Create a directory and add the source code of the consumer to the PubsubConsumer.scala file
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. Create and add the following to pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. Change to the project's spark directory and save the path in an environment variable to be used later:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. Change the directory:
cd $REPO_ROOT/spark
  1. Download Java 1.8 and place the folder in /usr/lib/jvm/. Then change your JAVA_HOME to point to this:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. Build the application jar
mvn clean package

The spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar archive containing the application code and dependencies is created in the spark/target directory

  1. Submit the spark application:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. Display the list of active jobs and note the JOB_ID value for the job:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

The output will look similar to

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. View the job output by opening the following URL in your browser. Replace [JOB_ID] with the value noted in the previous step.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. The output is similar to the following:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

The spark streaming job running in Dataproc pulls messages from Pub/sub, processes them and displays the output to the console.

  1. Terminating the job: Run the following command to terminate the job. Replace JOB_ID is with the same one we noted earlier
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

Congrats! You just created a powerful CDC pipeline which captures the database changes in Pub/sub and processes them using spark streaming running in Cloud Dataproc.

7. Clean up

Clean up any resources you created so you won't be billed for them in the future. The easiest way to eliminate billing is to delete the project you created for the tutorial. Alternatively, you can delete individual resources.

Run the following commands to delete individual resources

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. Congratulations

Congratulations, You've just completed a hands-on codelab that demonstrates how to build a robust real-time data pipeline using Google Cloud Platform. Let's recap what you've accomplished:

  • Simulated Change Data Capture (CDC): You learned the fundamentals of CDC and implemented a Python script to simulate database changes, generating events that represent real-time data modifications.
  • Leveraged Cloud Pub/Sub: You set up Cloud Pub/Sub topics and subscriptions, providing a scalable and reliable messaging service for streaming your CDC events. You published your simulated database changes to Pub/Sub, creating a real-time data stream.
  • Processed Data with Dataproc and Spark: You provisioned a Dataproc cluster and deployed a Spark Streaming job to consume messages from your Pub/Sub subscription. You processed and transformed the incoming CDC events in real-time, displaying the results in your Dataproc job logs.
  • Built an End-to-End Real-Time Pipeline: You successfully integrated these services to create a complete data pipeline that captures, streams, and processes data changes in real-time. You gained practical experience in building a system that can handle continuous data streams.
  • Used the Spark Pub/Sub Connector: You successfully configured a Dataproc cluster to use the Spark Pub/Sub connector, which is critical for Spark Structured Streaming to read data from Pub/Sub.

You now have a solid foundation for building more complex and sophisticated data pipelines for various applications, including real-time analytics, data warehousing, and microservices architectures. Keep exploring and building!

Reference docs