1. Pengantar

Terakhir Diperbarui: 19-06-2024
Apa itu Pengambilan Data Perubahan?
Pengambilan Data Perubahan (CDC) adalah serangkaian pola desain software yang digunakan untuk menentukan dan melacak data yang telah berubah dalam database. Sederhananya, ini adalah cara untuk merekam dan mencatat perubahan yang dilakukan pada data sehingga perubahan tersebut dapat direplikasi ke sistem lain.
Pengambilan Data Perubahan (CDC) sangat berguna dalam berbagai skenario berbasis data seperti Migrasi Data, Pergudangan dan Analisis Data Real-time, Pemulihan Bencana dan Ketersediaan Tinggi, Audit dan Kepatuhan, dll.
Migrasi Data
CDC menyederhanakan proyek migrasi data dengan memungkinkan transfer data inkremental, mengurangi periode nonaktif, dan meminimalkan gangguan.
Data Warehousing dan Analisis Real-time
CDC memastikan bahwa data warehouse dan sistem analisis terus diperbarui dengan perubahan terbaru dari database operasional.
Dengan begitu, bisnis dapat membuat keputusan berdasarkan informasi real-time.
Pemulihan dari Bencana dan Ketersediaan Tinggi
CDC memungkinkan replikasi data secara real-time ke database sekunder untuk tujuan pemulihan bencana. Jika terjadi kegagalan, CDC memungkinkan failover cepat ke database sekunder, sehingga meminimalkan periode nonaktif dan hilangnya data.
Audit dan Kepatuhan
CDC menyediakan audit trail mendetail tentang perubahan data, yang penting untuk kepatuhan dan persyaratan peraturan.
Yang akan Anda bangun
Dalam codelab ini, Anda akan membangun pipeline data pengambilan data perubahan (CDC) menggunakan Cloud Pub/Sub, Dataproc, Python, dan Apache Spark. Pipeline Anda akan:
- Simulasikan perubahan database dan publikasikan sebagai peristiwa ke Cloud Pub/Sub, layanan pesan yang skalabel dan andal.
- Manfaatkan kecanggihan Dataproc, layanan Spark dan Hadoop terkelola Google Cloud, untuk memproses peristiwa ini secara real-time.
Dengan menghubungkan layanan ini, Anda akan membuat pipeline yang andal yang mampu merekam dan memproses perubahan data saat terjadi, sehingga memberikan fondasi untuk analisis real-time, pergudangan data, dan aplikasi penting lainnya.
Yang akan Anda pelajari
- Cara membuat pipeline pengambilan data perubahan dasar
- Dataproc untuk pemrosesan streaming
- Cloud Pub/Sub untuk pesan real-time
- Dasar-dasar Apache Spark
Codelab ini berfokus pada Dataproc dan Cloud Pub/Sub. Konsep dan blok kode yang tidak relevan akan dibahas sekilas dan disediakan, jadi Anda cukup menyalin dan menempelkannya.
Yang Anda butuhkan
- akun GCP aktif dengan project yang telah disiapkan. Jika belum memilikinya, Anda dapat mendaftar untuk uji coba gratis.
- gcloud CLI telah diinstal dan dikonfigurasi.
- Python 3.7+ diinstal untuk menyimulasikan perubahan database dan berinteraksi dengan Pub/Sub.
- Pengetahuan Dasar tentang Dataproc, Cloud Pub/Sub, Apache Spark, dan Python.
Sebelum memulai
Jalankan perintah berikut di terminal untuk mengaktifkan API yang diperlukan:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
2. Menyiapkan Cloud Pub/Sub
Membuat Topik
Topik ini akan digunakan untuk memublikasikan perubahan database. Job Dataproc akan menjadi konsumen pesan ini dan akan memproses pesan untuk pengambilan data perubahan. Jika Anda ingin mengetahui lebih lanjut topik, Anda dapat membaca dokumentasi resmi di sini.
gcloud pubsub topics create database-changes
Membuat Langganan
Buat langganan yang akan digunakan untuk menggunakan pesan di Pub/Sub. Untuk mengetahui lebih lanjut langganan, Anda dapat membaca dokumentasi resmi di sini.
gcloud pubsub subscriptions create --topic database-changes change-subscriber
3. Simulasikan Perubahan Database
Langkah
- Buat skrip Python (misalnya,
simulate_cdc.py) untuk menyimulasikan perubahan database dan memublikasikannya ke Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
Ganti YOUR_PROJECT_ID dengan ID project GCP Anda yang sebenarnya
- Instal library klien Pub/Sub:
pip install google-cloud-pubsub
- Jalankan skrip di terminal Anda. Skrip ini akan berjalan terus-menerus dan memublikasikan pesan setiap 2 detik ke topik Pub/Sub.
python simulate_cdc.py
- Setelah menjalankan skrip selama, misalnya, 1 menit, Anda akan memiliki cukup banyak pesan di Pub/Sub untuk digunakan. Anda dapat menghentikan skrip python yang sedang berjalan dengan menekan ctrl + C atau Cmd + C, bergantung pada OS Anda.
- Melihat Pesan yang Dipublikasikan:
Buka terminal lain dan jalankan perintah berikut untuk melihat pesan yang dipublikasikan:
gcloud pubsub subscriptions pull --auto-ack change-subscriber
Anda akan melihat baris tabel yang berisi pesan dan kolom lainnya:
{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}
Penjelasan
- Skrip Python mensimulasikan perubahan database dengan membuat peristiwa
INSERT,UPDATE, atauDELETEsecara acak. - Setiap perubahan ditampilkan sebagai objek JSON yang berisi jenis perubahan, ID catatan, stempel waktu, dan data.
- Skrip menggunakan library klien Cloud Pub/Sub untuk memublikasikan peristiwa perubahan ini ke topik
database-changes. - Perintah subscriber memungkinkan Anda melihat pesan yang dikirim ke topik pub/sub.
4. Membuat akun layanan untuk Dataproc
Di bagian ini, Anda akan membuat Akun layanan yang dapat digunakan oleh cluster Dataproc. Anda juga menetapkan izin yang diperlukan agar instance cluster dapat mengakses Cloud Pub/Sub dan Dataproc.
- Membuat akun layanan:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- Tambahkan peran pekerja Dataproc untuk mengizinkan akun layanan membuat cluster dan menjalankan tugas. Tambahkan ID akun layanan yang dibuat pada perintah sebelumnya sebagai anggota dalam perintah di bawah:
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- Tambahkan peran pelanggan Pub/Sub untuk mengizinkan akun layanan berlangganan langganan Pub/Sub "change-subscriber":
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
5. Membuat Cluster Dataproc
Cluster Dataproc akan menjalankan aplikasi Spark yang akan memproses pesan di Pub/Sub. Anda akan memerlukan akun layanan yang dibuat di bagian sebelumnya. Dataproc menetapkan akun layanan ini ke setiap instance di cluster sehingga semua instance mendapatkan izin yang benar untuk menjalankan aplikasi.
Gunakan perintah berikut untuk membuat cluster Dataproc:
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
6. Mengirimkan Tugas Spark ke Cluster Dataproc
Aplikasi streaming Spark memproses pesan perubahan database di Pub/Sub dan mencetaknya ke konsol.
Langkah
- Buat direktori dan tambahkan kode sumber konsumen ke file PubsubConsumer.scala
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- Buat dan tambahkan kode berikut ke pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- Beralih ke direktori spark project dan simpan jalur dalam variabel lingkungan untuk digunakan nanti:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- Ubah direktori:
cd $REPO_ROOT/spark
- Download Java 1.8 dan tempatkan folder di /usr/lib/jvm/. Kemudian, ubah JAVA_HOME Anda untuk mengarah ke direktori ini:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- Membangun jar aplikasi
mvn clean package
Arsip spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar yang berisi kode aplikasi dan dependensi dibuat di direktori spark/target
- Kirimkan aplikasi spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- Tampilkan daftar tugas aktif dan catat nilai
JOB_IDuntuk tugas tersebut:
gcloud dataproc jobs list --region=us-central1 --state-filter=active
Outputnya akan terlihat seperti
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- Lihat output tugas dengan membuka URL berikut di browser Anda. Ganti [JOB_ID] dengan nilai yang dicatat pada langkah sebelumnya.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- Outputnya mirip dengan hal berikut ini:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
Tugas streaming Spark yang berjalan di Dataproc menarik pesan dari Pub/Sub, memprosesnya, dan menampilkan output ke konsol.
- Menghentikan tugas: Jalankan perintah berikut untuk menghentikan tugas. Ganti JOB_ID dengan ID yang sama yang kita catat sebelumnya
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
Selamat! Anda baru saja membuat pipeline CDC yang canggih yang merekam perubahan database di Pub/Sub dan memprosesnya menggunakan streaming Spark yang berjalan di Cloud Dataproc.
7. Pembersihan
Hapus semua resource yang Anda buat agar Anda tidak ditagih untuk resource tersebut pada masa mendatang. Cara termudah untuk menghilangkan penagihan adalah dengan menghapus project yang Anda buat untuk tutorial. Atau, Anda dapat menghapus setiap resource.
Jalankan perintah berikut untuk menghapus setiap resource
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
8. Selamat
Selamat, Anda baru saja menyelesaikan codelab praktik yang menunjukkan cara membangun pipeline data real-time yang andal menggunakan Google Cloud Platform. Mari kita rangkum pencapaian Anda:
- Pengambilan Data Perubahan (CDC) yang Disimulasikan: Anda mempelajari dasar-dasar CDC dan menerapkan skrip Python untuk menyimulasikan perubahan database, yang menghasilkan peristiwa yang merepresentasikan modifikasi data real-time.
- Memanfaatkan Cloud Pub/Sub: Anda menyiapkan topik dan langganan Cloud Pub/Sub, yang menyediakan layanan pengiriman pesan yang skalabel dan andal untuk menstreaming peristiwa CDC Anda. Anda telah memublikasikan perubahan database yang disimulasikan ke Pub/Sub, sehingga membuat aliran data real-time.
- Data yang Diproses dengan Dataproc dan Spark: Anda telah menyediakan cluster Dataproc dan men-deploy tugas Spark Streaming untuk menggunakan pesan dari langganan Pub/Sub Anda. Anda memproses dan mengubah peristiwa CDC yang masuk secara real-time, menampilkan hasilnya di log tugas Dataproc.
- Membangun Pipeline Real-Time End-to-End: Anda berhasil mengintegrasikan layanan ini untuk membuat pipeline data lengkap yang merekam, melakukan streaming, dan memproses perubahan data secara real-time. Anda mendapatkan pengalaman praktis dalam membangun sistem yang dapat menangani aliran data berkelanjutan.
- Menggunakan Spark Pub/Sub Connector: Anda berhasil mengonfigurasi cluster Dataproc untuk menggunakan Spark Pub/Sub Connector, yang penting agar Spark Structured Streaming dapat membaca data dari Pub/Sub.
Sekarang Anda memiliki fondasi yang kuat untuk membangun pipeline data yang lebih kompleks dan canggih untuk berbagai aplikasi, termasuk analisis real-time, data warehousing, dan arsitektur microservice. Teruslah bereksplorasi dan membangun!