Membuat Change Data Capture menggunakan Dataproc dan Cloud Pub/Sub

1. Pengantar

df8070bd84336207.png

Terakhir Diperbarui: 19-06-2025

Apa yang dimaksud dengan Change Data Capture?

Pengambilan Data Perubahan (CDC) adalah serangkaian pola desain software yang digunakan untuk menentukan dan melacak data yang telah berubah dalam database. Dengan kata yang lebih sederhana, ini adalah cara untuk merekam dan mencatat perubahan yang dilakukan pada data sehingga perubahan tersebut dapat direplikasi ke sistem lain.

Pengambilan Data Perubahan (CDC) sangat berguna di berbagai skenario berbasis data seperti Migrasi Data, Analisis dan Penyimpanan Data Real-time, Disaster Recovery dan High Availability, Audit dan Kepatuhan, dll.

Migrasi Data

CDC menyederhanakan project migrasi data dengan memungkinkan transfer data inkremental, mengurangi periode nonaktif, dan meminimalkan gangguan.

Data Warehousing dan Analisis Real-time

CDC memastikan bahwa data warehouse dan sistem analisis terus diperbarui dengan perubahan terbaru dari database operasional.

Hal ini memungkinkan bisnis membuat keputusan berdasarkan informasi real-time.

Pemulihan dari Bencana dan Ketersediaan Tinggi

CDC memungkinkan replikasi data secara real time ke database sekunder untuk tujuan pemulihan dari bencana. Jika terjadi kegagalan, CDC memungkinkan failover cepat ke database sekunder, sehingga meminimalkan periode nonaktif dan hilangnya data.

Audit dan Kepatuhan

CDC memberikan pelacakan audit mendetail tentang perubahan data, yang penting untuk kepatuhan dan persyaratan peraturan.

Yang akan Anda build

Dalam codelab ini, Anda akan membuat pipeline data capture data perubahan (CDC) menggunakan Cloud Pub/Sub, Dataproc, Python, dan Apache Spark. Pipeline Anda akan:

  • Simulasikan perubahan database dan publikasikan sebagai peristiwa ke Cloud Pub/Sub, layanan pesan yang skalabel dan andal.
  • Manfaatkan kecanggihan Dataproc, layanan Spark dan Hadoop terkelola Google Cloud, untuk memproses peristiwa ini secara real time.

Dengan menghubungkan layanan ini, Anda akan membuat pipeline yang andal yang dapat menangkap dan memproses perubahan data saat terjadi, sehingga memberikan dasar untuk analisis real-time, penyimpanan data, dan aplikasi penting lainnya.

Yang akan Anda pelajari

Codelab ini berfokus pada Dataproc dan Cloud Pub/Sub. Konsep dan blok kode yang tidak relevan akan dibahas sekilas dan disediakan, jadi Anda cukup menyalin dan menempelkannya.

Yang Anda butuhkan

  • akun GCP aktif dengan penyiapan project. Jika belum memilikinya, Anda dapat mendaftar untuk uji coba gratis.
  • gcloud CLI diinstal dan dikonfigurasi.
  • Python 3.7+ diinstal untuk menyimulasikan perubahan database dan berinteraksi dengan Pub/Sub.
  • Pengetahuan Dasar tentang Dataproc, Cloud Pub/Sub, Apache Spark, dan Python.

Sebelum memulai

Jalankan perintah berikut di terminal untuk mengaktifkan API yang diperlukan:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. Menyiapkan Cloud Pub/Sub

Membuat Topik

Topik ini akan digunakan untuk memublikasikan perubahan database. Tugas Dataproc akan menjadi konsumen pesan ini dan akan memproses pesan untuk pengambilan data perubahan. Jika ingin mengetahui topik lebih lanjut, Anda dapat membaca dokumentasi resmi di sini.

gcloud pubsub topics create database-changes

Membuat Langganan

Buat langganan yang akan digunakan untuk menggunakan pesan di Pub/Sub. Untuk mengetahui lebih lanjut langganan, Anda dapat membaca dokumentasi resmi di sini.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. Simulasikan Perubahan Database

Langkah

  1. Buat skrip Python (misalnya, simulate_cdc.py) untuk menyimulasikan perubahan database dan memublikasikannya ke Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

Ganti YOUR_PROJECT_ID dengan ID project GCP Anda yang sebenarnya

  1. Instal library klien Pub/Sub:
pip install google-cloud-pubsub
  1. Jalankan skrip di terminal Anda. Skrip ini akan berjalan terus-menerus dan memublikasikan pesan setiap 2 detik ke topik Pub/Sub.
python simulate_cdc.py
  1. Setelah menjalankan skrip selama 1 menit, Anda akan memiliki cukup pesan di Pub/Sub untuk digunakan. Anda dapat menghentikan skrip python yang sedang berjalan dengan menekan ctrl + C atau Cmd + C, bergantung pada OS Anda.
  2. Melihat Pesan yang Dipublikasikan:

Buka terminal lain dan jalankan perintah berikut untuk melihat pesan yang dipublikasikan:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

Anda akan melihat baris tabel yang berisi pesan dan kolom lainnya:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

Penjelasan

  • Skrip Python menyimulasikan perubahan database dengan secara acak menghasilkan peristiwa INSERT, UPDATE, atau DELETE.
  • Setiap perubahan direpresentasikan sebagai objek JSON yang berisi jenis perubahan, ID kumpulan data, stempel waktu, dan data.
  • Skrip ini menggunakan library klien Cloud Pub/Sub untuk memublikasikan peristiwa perubahan ini ke topik database-changes.
  • Perintah pelanggan memungkinkan Anda melihat pesan yang dikirim ke topik pub/sub.

4. Membuat akun layanan untuk Dataproc

Di bagian ini, Anda akan membuat Akun layanan yang dapat digunakan oleh cluster Dataproc. Anda juga menetapkan izin yang diperlukan untuk mengizinkan instance cluster mengakses Cloud Pub/Sub dan Dataproc.

  1. Buat akun layanan:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Tambahkan peran pekerja Dataproc untuk mengizinkan akun layanan membuat cluster dan menjalankan tugas. Tambahkan ID akun layanan yang dibuat dalam perintah sebelumnya sebagai anggota dalam perintah di bawah:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Tambahkan peran pelanggan Pub/Sub untuk mengizinkan akun layanan berlangganan langganan Pub/Sub "change-subscriber":
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. Membuat Cluster Dataproc

Cluster Dataproc akan menjalankan aplikasi spark yang akan memproses pesan di Pub/sub. Anda memerlukan akun layanan yang dibuat di bagian sebelumnya. Dataproc menetapkan akun layanan ini ke setiap instance di cluster sehingga semua instance mendapatkan izin yang benar untuk menjalankan aplikasi.

Gunakan perintah berikut untuk membuat cluster Dataproc:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Mengirimkan Tugas Spark ke Cluster Dataproc

Aplikasi streaming spark memproses pesan perubahan database di Pub/sub dan mencetaknya ke konsol.

Langkah

  1. Buat direktori dan tambahkan kode sumber konsumen ke file PubsubConsumer.scala
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. Buat dan tambahkan kode berikut ke pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. Beralihlah ke direktori spark project dan simpan jalur dalam variabel lingkungan untuk digunakan nanti:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. Ubah direktori:
cd $REPO_ROOT/spark
  1. Download Java 1.8 dan tempatkan folder di /usr/lib/jvm/. Kemudian, ubah JAVA_HOME Anda agar mengarah ke ini:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. Mem-build jar aplikasi
mvn clean package

Arsip spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar yang berisi kode aplikasi dan dependensi dibuat di direktori spark/target

  1. Kirim aplikasi spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. Tampilkan daftar tugas aktif dan catat nilai JOB_ID untuk tugas tersebut:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

Outputnya akan terlihat seperti

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. Lihat output tugas dengan membuka URL berikut di browser Anda. Ganti [JOB_ID] dengan nilai yang dicatat di langkah sebelumnya.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. Outputnya mirip dengan hal berikut ini:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

Tugas streaming spark yang berjalan di Dataproc mengambil pesan dari Pub/sub, memprosesnya, dan menampilkan output ke konsol.

  1. Menghentikan tugas: Jalankan perintah berikut untuk menghentikan tugas. Ganti JOB_ID dengan yang sama dengan yang kita catat sebelumnya
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

Selamat! Anda baru saja membuat pipeline CDC yang efektif yang menangkap perubahan database di Pub/Sub dan memprosesnya menggunakan streaming spark yang berjalan di Cloud Dataproc.

7. Pembersihan

Bersihkan resource yang Anda buat sehingga Anda tidak akan ditagih di masa mendatang. Cara termudah untuk menghilangkan penagihan adalah dengan menghapus project yang Anda buat untuk tutorial. Atau, Anda dapat menghapus resource satu per satu.

Jalankan perintah berikut untuk menghapus resource satu per satu

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. Selamat

Selamat, Anda baru saja menyelesaikan codelab praktik yang menunjukkan cara membuat pipeline data real-time yang andal menggunakan Google Cloud Platform. Mari kita rangkum pencapaian Anda:

  • Simulasi Pengambilan Data Perubahan (CDC): Anda telah mempelajari dasar-dasar CDC dan menerapkan skrip Python untuk menyimulasikan perubahan database, yang menghasilkan peristiwa yang mewakili modifikasi data real-time.
  • Cloud Pub/Sub yang dimanfaatkan: Anda menyiapkan topik dan langganan Cloud Pub/Sub, yang menyediakan layanan pesan yang skalabel dan andal untuk melakukan streaming peristiwa CDC. Anda memublikasikan perubahan database simulasi ke Pub/Sub, sehingga membuat aliran data real-time.
  • Data yang Diproses dengan Dataproc dan Spark: Anda menyediakan cluster Dataproc dan men-deploy tugas Spark Streaming untuk menggunakan pesan dari langganan Pub/Sub. Anda telah memproses dan mengubah peristiwa CDC yang masuk secara real time, yang menampilkan hasilnya di log tugas Dataproc.
  • Membuat Pipeline Real-Time End-to-End: Anda berhasil mengintegrasikan layanan ini untuk membuat pipeline data lengkap yang merekam, melakukan streaming, dan memproses perubahan data secara real-time. Anda mendapatkan pengalaman praktis dalam membangun sistem yang dapat menangani aliran data berkelanjutan.
  • Menggunakan Konektor Spark Pub/Sub: Anda berhasil mengonfigurasi cluster Dataproc untuk menggunakan konektor Spark Pub/Sub, yang penting bagi Streaming Terstruktur Spark untuk membaca data dari Pub/Sub.

Sekarang Anda memiliki dasar yang kuat untuk membuat pipeline data yang lebih kompleks dan canggih untuk berbagai aplikasi, termasuk analisis real-time, data warehouse, dan arsitektur microservice. Teruslah bereksplorasi dan membangun.

Dokumen referensi