1. Wprowadzenie
Ostatnia aktualizacja: 19.06.2025
Co to jest przechwytywanie zmian danych?
Przechwytywanie zmian danych (CDC) to zbiór wzorów projektowania oprogramowania służących do określania i śledzenia danych, które uległy zmianie w bazie danych. Mówiąc prościej, jest to sposób rejestrowania zmian w danych, aby można je było powielać w innych systemach.
Przechwytywanie zmian danych (CDC) jest niezwykle przydatne w szerokim zakresie scenariuszy opartych na danych, takich jak migracja danych, hurtownia danych i analiza w czasie rzeczywistym, odtwarzanie awaryjne i wysoka dostępność, audyt i zgodność z przepisami itp.
Migracja danych
CDC upraszcza projekty migracji danych, umożliwiając przyrostowy transfer danych, skracając czas przestoju i minimalizując zakłócenia.
Hurtownia danych i analiza w czasie rzeczywistym
Funkcja CDC zapewnia, że magazyny danych i systemy analityczne są stale aktualizowane o najnowsze zmiany z baz danych operacyjnych.
Dzięki temu firmy mogą podejmować decyzje na podstawie informacji w czasie rzeczywistym.
Odtwarzanie awaryjne i wysoka dostępność
Funkcja CDC umożliwia replikowanie danych w czasie rzeczywistym do baz danych pomocniczych na potrzeby odtwarzania awaryjnego. W przypadku awarii mechanizm CDC umożliwia szybkie przełączenie na bazę danych pomocniczych, co minimalizuje czas przestoju i utratę danych.
Kontrola i zgodna z wymaganiami
CDC zapewnia szczegółowy ślad audytu zmian danych, który jest niezbędny do zapewnienia zgodności z wymaganiami prawnymi.
Co utworzysz
W tym laboratorium programistycznym utworzysz potok danych do przechwytywania zmian (CDC) za pomocą Cloud Pub/Sub, Dataproc, Pythona i Apache Spark. Twój potok:
- Symuluj zmiany w bazie danych i publikuj je jako zdarzenia w Cloud Pub/Sub, skalowalnej i niezawodnej usłudze przesyłania wiadomości.
- Korzystaj z potęgi Dataproc, zarządzanej usługi Spark i Hadoop w Google Cloud, aby przetwarzać te zdarzenia w czasie rzeczywistym.
Połączenie tych usług pozwoli Ci utworzyć niezawodny system, który będzie w miarę ich występowania rejestrować i przetwarzać zmiany w danych, zapewniając podstawę do analizy w czasie rzeczywistym, magazynowania danych i innych kluczowych aplikacji.
Czego się nauczysz
- Jak utworzyć podstawowy potok danych zmian
- Dataproc do przetwarzania strumieniowego
- Cloud Pub/Sub do przesyłania wiadomości w czasie rzeczywistym
- Podstawy Apache Spark
Ten moduł dotyczy Dataproc i Cloud Pub/Sub. Nieistotne koncepcje i bloki kodu zostały pominięte. Można je po prostu skopiować i wkleić.
Czego potrzebujesz
- aktywne konto GCP z zaimplementowanym projektem. Jeśli go nie masz, możesz zarejestrować się na bezpłatny okres próbny.
- zainstalowany i skonfigurowany interfejs wiersza poleceń gcloud.
- zainstalowana wersja Pythona 3.7 lub nowsza do symulowania zmian w bazie danych i interakcji z Pub/Sub;
- podstawowa znajomość usług Dataproc, Cloud Pub/Sub, Apache Spark i Python;
Zanim zaczniesz
Aby włączyć wymagane interfejsy API, uruchom w terminalu to polecenie:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
2. Konfigurowanie Cloud Pub/Sub
Tworzenie tematu
Ten temat będzie służyć do publikowania zmian w bazie danych. Zadania Dataproc będą odbiorcami tych wiadomości i będą je przetwarzać w celu rejestrowania danych o zmianach. Jeśli chcesz dowiedzieć się więcej o tych tematach, przeczytaj oficjalną dokumentację tutaj.
gcloud pubsub topics create database-changes
Tworzenie subskrypcji
Utwórz subskrypcję, która będzie używana do odbierania wiadomości w Pub/Sub. Więcej informacji o subskrypcjach znajdziesz w oficjalnej dokumentacji tutaj.
gcloud pubsub subscriptions create --topic database-changes change-subscriber
3. Symulowanie zmian w bazie danych
Kroki
- Utwórz skrypt Pythona (np.
simulate_cdc.py
) do symulowania zmian w bazie danych i publikowania ich w Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
Zastąp parametr YOUR_PROJECT_ID identyfikatorem projektu GCP.
- Zainstaluj bibliotekę klienta Pub/Sub:
pip install google-cloud-pubsub
- Uruchom skrypt w terminalu. Ten skrypt będzie działać nieprzerwanie i publikować wiadomości w temacie Pub/Sub co 2 sekundy.
python simulate_cdc.py
- Po uruchomieniu skryptu przez około minutę będziesz mieć w Pub/Sub wystarczającą liczbę wiadomości do wykorzystania. Uruchomiony skrypt Pythona możesz zakończyć, naciskając Ctrl + C lub Cmd + C, w zależności od systemu operacyjnego.
- Wyświetlanie opublikowanych wiadomości:
Aby wyświetlić opublikowane wiadomości, otwórz inny terminal i uruchom to polecenie:
gcloud pubsub subscriptions pull --auto-ack change-subscriber
Powinien pojawić się wiersz tabeli zawierający wiadomość i inne pola:
{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}
Wyjaśnienie
- Skrypt Pythona symuluje zmiany w bazie danych, losowo generując zdarzenia
INSERT
,UPDATE
lubDELETE
. - Każda zmiana jest reprezentowana jako obiekt JSON zawierający typ zmiany, identyfikator rekordu, sygnaturę czasową i dane.
- Skrypt używa biblioteki klienta Cloud Pub/Sub do publikowania tych zdarzeń zmiany w temacie
database-changes
. - Polecenie subscriber pozwala wyświetlać wiadomości wysyłane do tematu pub/sub.
4. Tworzenie konta usługi Dataproc
W tej sekcji utworzysz konto usługi, którego może używać klaster Dataproc. Musisz też przypisać niezbędne uprawnienia, aby instancje klastra mogły uzyskać dostęp do Cloud Pub/Sub i Dataproc.
- Utwórz konto usługi:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- Dodaj rolę zasobu roboczego Dataproc, aby umożliwić kontu usługi tworzenie klastrów i uruchamianie zadań. Dodaj identyfikator konta usługi wygenerowany w poprzednim poleceniu jako członek w tym poleceniu:
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- Dodaj rolę subskrybenta Pub/Sub, aby umożliwić kontu usługi subskrybowanie subskrypcji Pub/Sub „change-subscriber”:
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
5. Tworzenie klastra Dataproc
Klaster Dataproc uruchomi aplikację Spark, która przetworzy wiadomości w Pub/Sub. Potrzebujesz konta usługi utworzonego w sekcji poprzedniej. Dataproc przypisuje to konto usługi do każdej instancji w klastrze, aby wszystkie instancje miały prawidłowe uprawnienia do uruchamiania aplikacji.
Aby utworzyć klaster Dataproc, użyj tego polecenia:
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
6. Przesyłanie zadania Spark do klastra Dataproc
Aplikacja strumieniowego przetwarzania Spark przetwarza wiadomości o zmianach w bazie danych w Pub/Sub i wypisuje je w konsoli.
Kroki
- Utwórz katalog i dodaj kod źródłowy konsumenta do pliku PubsubConsumer.scala.
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- Utwórz i dodaj do pliku pom.xml:
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- Przejdź do katalogu spark projektu i zapisz ścieżkę w zmiennej środowiskowej, aby użyć jej później:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- Zmień katalog:
cd $REPO_ROOT/spark
- Pobierz Java 1.8 i umieść folder w /usr/lib/jvm/. Następnie zmień zmienną JAVA_HOME, aby wskazywała na:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- Kompilowanie pliku JAR aplikacji
mvn clean package
W katalogu spark/target
jest tworzone archiwum spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar zawierające kod aplikacji i zależne komponenty.
- Prześlij aplikację Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- Wyświetl listę aktywnych zadań i zapisz wartość
JOB_ID
dla danego zadania:
gcloud dataproc jobs list --region=us-central1 --state-filter=active
Dane wyjściowe będą wyglądać mniej więcej tak:
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- Aby wyświetlić dane wyjściowe zadania, otwórz w przeglądarce ten adres URL. Zastąp [JOB_ID] wartością zapisaną w poprzednim kroku.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- Dane wyjściowe są podobne do tych:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
Zadanie strumieniowego przetwarzania Sparka uruchomione w Dataproc pobiera wiadomości z Pub/Sub, przetwarza je i wyświetla dane wyjściowe w konsoli.
- Zakończenie zadania: uruchom to polecenie, aby zakończyć zadanie. Zastąp JOB_ID tym samym identyfikatorem, który został wcześniej zapisany
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
Gratulacje! Właśnie utworzyliśmy potężny potok CDC, który rejestruje zmiany w bazie danych w Pub/Sub i przetwarza je za pomocą strumieniowego Sparka działającego w Dataproc.
7. Czyszczenie danych
Usuń utworzone zasoby, aby nie obciążać Cię za nie w przyszłości. Najprostszym sposobem na uniknięcie płatności jest usunięcie projektu utworzonego na potrzeby samouczka. Możesz też usunąć poszczególne zasoby.
Aby usunąć poszczególne zasoby, uruchom te polecenia
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
8. Gratulacje
Gratulacje! Właśnie ukończyłeś/ukończyłaś praktyczne ćwiczenie, które pokazuje, jak tworzyć niezawodne potoki danych w czasie rzeczywistym za pomocą Google Cloud Platform. Podsumujmy, czego się nauczyłeś/nauczyłaś:
- Symulowane przechwytywanie zmian danych (CDC): poznasz podstawy CDC i wdrożysz skrypt Pythona, aby symulować zmiany w bazie danych, generując zdarzenia, które reprezentują modyfikacje danych w czasie rzeczywistym.
- Wykorzystanie Cloud Pub/Sub: konfigurujesz tematy i subskrypcje Cloud Pub/Sub, zapewniając skalowalny i niezawodny system przesyłania wiadomości do przesyłania strumieniowego zdarzeń CDC. Symulowane zmiany w bazie danych zostały opublikowane w Pub/Sub, tworząc strumień danych w czasie rzeczywistym.
- Przetworzone dane za pomocą Dataproc i Spark: zarezerwowałeś klaster Dataproc i wdrożyłeś zadanie Spark Streaming, aby pobierać wiadomości z subskrypcji Pub/Sub. Otrzymane zdarzenia CDC zostały przetworzone w czasie rzeczywistym i wyświetlone w dziennikach zadań Dataproc.
- Utworzenie kompleksowego potoku danych w czasie rzeczywistym: udało Ci się zintegrować te usługi, aby utworzyć kompletny potok danych, który rejestruje, przesyła strumieniowo i przetwarza zmiany danych w czasie rzeczywistym. zdobyć praktyczne doświadczenie w budowaniu systemu, który może obsługiwać ciągłe strumienie danych;
- Użycie łącznika Spark Pub/Sub: klaster Dataproc został skonfigurowany do używania łącznika Spark Pub/Sub, który jest niezbędny do odczytywania danych z Pub/Sub przez Spark Structured Streaming.
Masz teraz solidne podstawy do tworzenia bardziej złożonych i zaawansowanych strumieni danych na potrzeby różnych aplikacji, w tym analizy w czasie rzeczywistym, magazynowania danych i architektury mikroserwisów. Kontynuuj eksplorowanie i tworzenie.