1. Giriş
Son Güncelleme: 19.06.2025
Değişiklik verilerini yakalama nedir?
Veri Değişikliği Yakalama (CDC), bir veritabanında değişen verileri belirlemek ve izlemek için kullanılan bir dizi yazılım tasarım kalıbıdır. Daha basit bir ifadeyle, verilerde yapılan değişikliklerin yakalanıp kaydedilmesi ve bu değişikliklerin diğer sistemlere kopyalanması için kullanılan bir yöntemdir.
Değişiklik Veri Yakalama (CDC), veri taşıma, gerçek zamanlı veri ambarı ve analizi, felaket kurtarma ve yüksek kullanılabilirlik, denetim ve uygunluk gibi çeşitli veriye dayalı senaryolarda son derece faydalıdır.
Veri Taşıma
CDC, artımlı veri aktarımına olanak tanıyarak, kapalı kalma süresini azaltarak ve kesintileri en aza indirerek veri taşıma projelerini basitleştirir.
Gerçek zamanlı veri ambarı ve analiz
CDC, veri ambarlarının ve analitik sistemlerin, operasyonel veritabanlarındaki en son değişikliklerle sürekli olarak güncellenmesini sağlar.
Bu sayede işletmeler gerçek zamanlı bilgilere dayalı kararlar alabilir.
Olağanüstü Durum Kurtarma ve Yüksek Kullanılabilirlik
CDC, olağanüstü durum kurtarma amacıyla verilerin ikincil veritabanlarına gerçek zamanlı olarak çoğaltılmasını sağlar. CDC, bir hata durumunda ikincil bir veritabanına hızlı bir şekilde geçiş yapılmasına olanak tanır. Böylece, kapalı kalma süresi ve veri kaybı en aza indirilir.
Denetim ve Uygunluk
CDC, uyumluluk ve yasal şartlar için gerekli olan veri değişikliklerinin ayrıntılı bir denetim günlüğü sağlar.
Oluşturacağınız uygulama
Bu codelab'de Cloud Pub/Sub, Dataproc, Python ve Apache Spark'ı kullanarak bir veri değişikliği yakalama (CDC) veri ardışık düzeni oluşturacaksınız. Ardışık düzeniniz:
- Veritabanı değişikliklerini simüle edin ve ölçeklenebilir ve güvenilir bir mesajlaşma hizmeti olan Cloud Pub/Sub'da etkinlik olarak yayınlayın.
- Bu etkinlikleri gerçek zamanlı olarak işlemek için Google Cloud'un yönetilen Spark ve Hadoop hizmeti olan Dataproc'in gücünden yararlanın.
Bu hizmetleri bağlayarak, veri değişikliklerini gerçekleştikçe yakalayıp işleyebilen güçlü bir ardışık düzen oluşturursunuz. Bu ardışık düzen, gerçek zamanlı analizler, veri ambarı ve diğer kritik uygulamalar için bir temel oluşturur.
Neler öğreneceksiniz?
- Temel bir değişiklik verisi yakalama ardışık düzeni oluşturma
- Akış işleme için Dataproc
- Gerçek zamanlı mesajlaşma için Cloud Pub/Sub
- Apache Spark'ın temelleri
Bu codelab, Dataproc ve Cloud Pub/Sub'a odaklanmaktadır. Alakalı olmayan kavramlar ve kod blokları işaretlenmiştir ve yalnızca kopyalayıp yapıştırmanız için kullanımınıza sunulmuştur.
Gerekenler
- Projesi oluşturulmuş etkin bir GCP hesabınız olmalıdır. Hesabınız yoksa ücretsiz deneme sürümüne kaydolabilirsiniz.
- gcloud CLI yüklenmiş ve yapılandırılmış olmalıdır.
- Veritabanı değişikliklerini simüle etmek ve Pub/Sub ile etkileşimde bulunmak için Python 3.7 veya sonraki bir sürüm yüklü olmalıdır.
- Dataproc, Cloud Pub/Sub, Apache Spark ve Python hakkında temel bilgi sahibi olmanız gerekir.
Başlamadan önce
Gerekli API'leri etkinleştirmek için terminalde aşağıdaki komutu yürütün:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
2. Cloud Pub/Sub'ı ayarlama
Konu oluşturma
Bu konu, veritabanı değişikliklerini yayınlamak için kullanılır. Dataproc işi bu iletilerin tüketicisi olacak ve değişiklik verilerini yakalamak için iletileri işleyecektir. Topics hakkında daha fazla bilgi edinmek istiyorsanız buradaki resmi dokümanları okuyabilirsiniz.
gcloud pubsub topics create database-changes
Abonelik Oluşturma
Pub/Sub'daki mesajları kullanmak için kullanılacak bir abonelik oluşturun. Abonelikler hakkında daha fazla bilgi edinmek için buradaki resmi dokümanları okuyabilirsiniz.
gcloud pubsub subscriptions create --topic database-changes change-subscriber
3. Veritabanı Değişikliklerini Simüle Etme
Adımlar
- Python komut dosyası oluşturun (ör.
simulate_cdc.py
) kullanarak veritabanı değişikliklerini simüle edebilir ve Pub/Sub'a yayınlayabilirsiniz.
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
PROJE_KİMLİĞİNİZ kısmını gerçek GCP proje kimliğinizle değiştirin.
- Pub/Sub istemci kitaplığını yükleyin:
pip install google-cloud-pubsub
- Komut dosyasını terminalinizde çalıştırın. Bu komut dosyası sürekli olarak çalışır ve Pub/Sub konusuna 2 saniyede bir mesaj yayınlar.
python simulate_cdc.py
- Komut dosyasını 1 dakika çalıştırdıktan sonra Pub/Sub'da kullanabileceğiniz yeterli mesajınız olur. İşletim sisteminize bağlı olarak Ctrl + C veya Cmd + C tuşlarına basarak çalışan Python komut dosyasını sonlandırabilirsiniz.
- Yayınlanan mesajları görüntüleme:
Başka bir terminal açıp yayınlanan mesajları görüntülemek için aşağıdaki komutu çalıştırın:
gcloud pubsub subscriptions pull --auto-ack change-subscriber
Mesajın yanı sıra diğer alanları içeren bir tablo satırı görürsünüz:
{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}
Açıklama
- Python komut dosyası,
INSERT
,UPDATE
veyaDELETE
etkinliklerini rastgele oluşturarak veritabanı değişikliklerini simüle eder. - Her değişiklik, değişiklik türünü, kayıt kimliğini, zaman damgasını ve verileri içeren bir JSON nesnesi olarak temsil edilir.
- Komut dosyası, bu değişiklik etkinliklerini
database-changes
konusuna yayınlamak için Cloud Pub/Sub istemci kitaplığını kullanır. - subscriber komutu, pub/sub konusuna gönderilen mesajları görüntülemenize olanak tanır.
4. Dataproc için hizmet hesabı oluşturma
Bu bölümde, Dataproc kümesinin kullanabileceği bir hizmet hesabı oluşturursunuz. Ayrıca, küme örneklerinin Cloud Pub/Sub ve Dataproc'e erişmesine izin vermek için gerekli izinleri de atarsınız.
- Hizmet hesabı oluşturun:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- Hizmet hesabının küme oluşturmasına ve iş çalıştırmasına izin vermek için Dataproc çalışan rolünü ekleyin. Önceki komutta oluşturulan hizmet hesabı kimliğini aşağıdaki komutta üye olarak ekleyin:
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- Hizmet hesabının "change-subscriber" Pub/Sub aboneliğine abone olmasına izin vermek için Pub/Sub abonesi rolünü ekleyin:
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
5. Dataproc Kümesi Oluşturma
Dataproc kümesi, Pub/Sub'daki mesajları işleyen Spark uygulamasını çalıştırır. Önceki bölümde oluşturulan hizmet hesabına ihtiyacınız vardır. Dataproc, tüm örneklerin uygulamayı çalıştırmak için doğru izinleri alması amacıyla bu hizmet hesabını kümedeki her örneğe atar.
Dataproc kümesi oluşturmak için aşağıdaki komutu kullanın:
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
6. Spark işini Dataproc kümesine gönderme
Spark akış uygulaması, Pub/Sub'daki veritabanı değişikliği mesajlarını işler ve konsola yazdırır.
Adımlar
- Bir dizin oluşturun ve tüketicinin kaynak kodunu PubsubConsumer.scala dosyasına ekleyin
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- Aşağıdakileri oluşturup pom.xml dosyasına ekleyin
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- Projenin Spark dizine gidin ve yolu daha sonra kullanmak üzere bir ortam değişkenine kaydedin:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- Dizini değiştirin:
cd $REPO_ROOT/spark
- Java 1.8'i indirip klasörü /usr/lib/jvm/ konumuna yerleştirin. Ardından JAVA_HOME'u şunu işaret edecek şekilde değiştirin:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- Uygulama jar dosyasını derleme
mvn clean package
Uygulama kodunu ve bağımlılıkları içeren spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar arşivi spark/target
dizininde oluşturulur.
- Spark başvurusunu gönderin:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- Etkin işlerin listesini görüntüleyin ve işin
JOB_ID
değerini not edin:
gcloud dataproc jobs list --region=us-central1 --state-filter=active
Çıkış şuna benzer şekilde görünür:
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- Tarayıcınızda aşağıdaki URL'yi açarak iş çıkışını görüntüleyin. [JOB_ID] değerini önceki adımda not ettiğiniz değerle değiştirin.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- Çıkış şuna benzer:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
Dataproc'te çalışan Spark akış işi, Pub/Sub'dan mesajları alır, işler ve çıkışı konsolda gösterir.
- İşi sonlandırma: İşi sonlandırmak için aşağıdaki komutu çalıştırın. JOB_ID değerini daha önce not ettiğimiz değerle değiştirin.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
Tebrikler! Pub/Sub'daki veritabanı değişikliklerini yakalayıp Cloud Dataproc'te çalışan Spark akışını kullanarak işleyen güçlü bir CDC ardışık düzeni oluşturdunuz.
7. Temizleme
Oluşturduğunuz kaynakları temizleyerek gelecekte bunlar için faturalandırılmayın. Faturalandırılmanın önüne geçmenin en kolay yolu, eğitim için oluşturduğunuz projeyi silmektir. Alternatif olarak, kaynakları tek tek de silebilirsiniz.
Kaynakları tek tek silmek için aşağıdaki komutları çalıştırın
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
8. Tebrikler
Tebrikler. Google Cloud Platform'u kullanarak güçlü bir gerçek zamanlı veri ardışık düzeninin nasıl oluşturulacağını gösteren uygulamalı bir codelab'i tamamladınız. Başarılarınızı özetleyelim:
- Simüle Edilen Değişiklik Verileri Yakalama (CDC): CDC'nin temellerini öğrendiniz ve veritabanı değişikliklerini simüle etmek için bir Python komut dosyası uygulayarak gerçek zamanlı veri değişikliklerini temsil eden etkinlikler oluşturdunuz.
- Cloud Pub/Sub'dan yararlanma: Cloud Pub/Sub konularını ve aboneliklerini ayarlayarak CDC etkinliklerinizi yayınlamak için ölçeklenebilir ve güvenilir bir mesajlaşma hizmeti sağlarsınız. Simülasyonlu veritabanı değişikliklerinizi Pub/Sub'da yayınladınız ve gerçek zamanlı bir veri akışı oluşturdunuz.
- Dataproc ve Spark ile işlenmiş veriler: Pub/Sub aboneliğinizden mesaj tüketmek için bir Dataproc kümesi oluşturdunuz ve bir Spark Streaming işi dağıttınız. Gelen CDC etkinliklerini gerçek zamanlı olarak işleyip dönüştürerek sonuçları Dataproc iş günlüklerinizde görüntülediniz.
- Uçtan uca gerçek zamanlı bir ardışık düzen oluşturma: Veri değişikliklerini gerçek zamanlı olarak yakalayan, yayınlayan ve işleyen eksiksiz bir veri ardışık düzeni oluşturmak için bu hizmetleri başarıyla entegre ettiniz. Sürekli veri akışlarını işleyebilecek bir sistem oluşturma konusunda pratik deneyim kazandınız.
- Spark Pub/Sub bağlayıcısı kullanıldı: Spark Pub/Sub bağlayıcısını kullanacak şekilde başarıyla yapılandırılmış bir Dataproc kümesi, Spark Structured Streaming'in Pub/Sub'dan veri okuması için kritik öneme sahiptir.
Artık anlık analizler, veri ambarları ve mikro hizmet mimarileri gibi çeşitli uygulamalar için daha karmaşık ve gelişmiş veri ardışık düzenleri oluşturmak üzere sağlam bir temele sahipsiniz. Keşfetmeye ve içerik üretmeye devam edin.