1. Введение

Последнее обновление: 19.06.2025
Что такое отслеживание изменений данных?
Захват изменений данных (Change Data Capture, CDC) — это набор шаблонов проектирования программного обеспечения, используемых для определения и отслеживания данных, изменившихся в базе данных. Проще говоря, это способ фиксации и записи изменений, внесенных в данные, чтобы эти изменения можно было реплицировать в другие системы.
Функция отслеживания изменений данных (Change Data Capture, CDC) невероятно полезна в широком спектре сценариев, основанных на данных, таких как миграция данных, хранилища данных и аналитика в реальном времени, аварийное восстановление и обеспечение высокой доступности, аудит и соответствие требованиям и т. д.
Миграция данных
CDC упрощает проекты миграции данных, позволяя осуществлять поэтапную передачу данных, сокращая время простоя и минимизируя сбои.
Хранилища данных и аналитика в режиме реального времени
CDC обеспечивает постоянное обновление хранилищ данных и аналитических систем с учетом последних изменений в оперативных базах данных.
Это позволяет предприятиям принимать решения на основе информации в режиме реального времени.
Аварийное восстановление и высокая доступность
CDC обеспечивает репликацию данных в режиме реального времени на резервные базы данных для целей аварийного восстановления. В случае сбоя CDC позволяет быстро переключиться на резервную базу данных, минимизируя время простоя и потерю данных.
Аудит и соответствие требованиям
CDC предоставляет подробный журнал аудита изменений данных, что крайне важно для соблюдения нормативных требований.
Что вы построите
В этом практическом задании вы создадите конвейер обработки данных для отслеживания изменений (CDC) с использованием Cloud Pub/Sub, Dataproc, Python и Apache Spark. Ваш конвейер будет:
- Имитируйте изменения в базе данных и публикуйте их в виде событий в Cloud Pub/Sub, масштабируемый и надежный сервис обмена сообщениями.
- Воспользуйтесь возможностями Dataproc, управляемого сервиса Spark и Hadoop от Google Cloud, для обработки этих событий в режиме реального времени.
Объединив эти сервисы, вы создадите надежный конвейер, способный фиксировать и обрабатывать изменения данных по мере их возникновения, обеспечивая основу для аналитики в реальном времени, хранилищ данных и других важных приложений.
Что вы узнаете
- Как создать базовый конвейер отслеживания изменений данных
- Dataproc для обработки потоковых данных
- Облачная система публикации/подписки для обмена сообщениями в режиме реального времени.
- Основы Apache Spark
Данный практический урок посвящен Dataproc и Cloud Pub/Sub. Несущественные концепции и фрагменты кода опущены и предоставлены для простого копирования и вставки.
Что вам понадобится
- Для работы требуется активный аккаунт GCP с настроенным проектом. Если у вас его нет, вы можете зарегистрироваться для бесплатной пробной версии.
- Интерфейс командной строки gcloud установлен и настроен.
- Установлен Python 3.7+ для имитации изменений в базе данных и взаимодействия с системой публикации/подписки.
- Базовые знания Dataproc, Cloud Pub/Sub, Apache Spark и Python.
Прежде чем начать
Выполните следующую команду в терминале, чтобы включить необходимые API:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
2. Настройка облачной системы публикации/подписки
Создать тему
Эта тема будет использоваться для публикации изменений в базе данных. Задание Dataproc будет получать эти сообщения и обрабатывать их для отслеживания изменений данных. Более подробную информацию о темах можно найти в официальной документации здесь .
gcloud pubsub topics create database-changes
Создать подписку
Создайте подписку, которая будет использоваться для получения сообщений из раздела Pub/Sub. Более подробную информацию о подписках можно найти в официальной документации здесь .
gcloud pubsub subscriptions create --topic database-changes change-subscriber
3. Имитация изменений в базе данных
Шаги
- Создайте скрипт на Python (например,
simulate_cdc.py) для имитации изменений в базе данных и публикации их в Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
Замените YOUR_PROJECT_ID на фактический идентификатор вашего проекта GCP.
- Установите клиентскую библиотеку Pub/Sub:
pip install google-cloud-pubsub
- Запустите скрипт в терминале. Этот скрипт будет работать непрерывно и публиковать сообщения каждые 2 секунды в топик Pub/Sub.
python simulate_cdc.py
- После выполнения скрипта, скажем, в течение 1 минуты, в папке Pub/Sub накопится достаточно сообщений для обработки. Вы можете завершить работу скрипта Python, нажав Ctrl + C или Cmd + C, в зависимости от вашей операционной системы.
- Просмотреть опубликованные сообщения:
Откройте другой терминал и выполните следующую команду, чтобы просмотреть опубликованные сообщения:
gcloud pubsub subscriptions pull --auto-ack change-subscriber
Вы должны увидеть строку таблицы, содержащую сообщение и другие поля:
{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}
Объяснение
- Этот скрипт на Python имитирует изменения в базе данных, генерируя случайным образом события
INSERT,UPDATEилиDELETE. - Каждое изменение представляется в виде объекта JSON, содержащего тип изменения, идентификатор записи, метку времени и дату.
- Скрипт использует клиентскую библиотеку Cloud Pub/Sub для публикации этих событий изменений в топик
database-changes. - Команда subscriber позволяет просматривать сообщения, отправляемые в тему pub/sub.
4. Создайте учетную запись службы для Dataproc.
В этом разделе вы создаете учетную запись службы, которую сможет использовать кластер Dataproc. Вы также назначаете необходимые разрешения, чтобы экземпляры кластера могли получать доступ к Cloud Pub/sub и Dataproc.
- Создайте учетную запись службы:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- Добавьте роль Dataproc worker, чтобы разрешить учетной записи службы создавать кластеры и запускать задания. Добавьте идентификатор учетной записи службы, сгенерированный в предыдущей команде, в качестве участника в следующей команде:
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- Добавьте роль подписчика Pub/sub, чтобы разрешить учетной записи службы подписаться на подписку Pub/sub "change-subscriber":
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
5. Создайте кластер Dataproc.
Кластер Dataproc будет запускать приложение Spark, которое будет обрабатывать сообщения в системе Pub/sub. Вам потребуется учетная запись службы, созданная в предыдущем разделе. Dataproc назначает эту учетную запись службы каждому экземпляру в кластере, чтобы все экземпляры получили необходимые разрешения для запуска приложения.
Для создания кластера Dataproc используйте следующую команду:
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
6. Отправьте задание Spark в кластер Dataproc.
Приложение Spark Streaming обрабатывает сообщения об изменениях в базе данных в режиме публикации/подписки и выводит их в консоль.
Шаги
- Создайте директорию и добавьте исходный код потребителя в файл PubsubConsumer.scala.
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- Создайте и добавьте следующее в файл pom.xml.
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- Перейдите в каталог проекта Spark и сохраните путь в переменной среды для дальнейшего использования:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- Сменить каталог:
cd $REPO_ROOT/spark
- Загрузите Java 1.8 и поместите папку в /usr/lib/jvm/. Затем измените переменную JAVA_HOME, указав следующий путь:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- Соберите JAR-файл приложения.
mvn clean package
В каталоге spark/target создается архив spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar, содержащий код приложения и зависимости.
- Отправьте заявку на участие в программе Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- Отобразите список активных заданий и обратите внимание на значение
JOB_IDдля каждого задания:
gcloud dataproc jobs list --region=us-central1 --state-filter=active
Результат будет выглядеть примерно так:
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- Чтобы просмотреть результаты выполнения задания, откройте в браузере следующий URL-адрес. Замените [JOB_ID] значением, указанным на предыдущем шаге.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- Результат будет примерно следующим:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
Задача Spark Streaming, выполняемая в Dataproc, извлекает сообщения из системы Pub/sub, обрабатывает их и выводит результат в консоль.
- Завершение задания: Выполните следующую команду, чтобы завершить задание. Замените JOB_ID на тот же идентификатор, который мы указали ранее.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
Поздравляем! Вы только что создали мощный конвейер CDC, который фиксирует изменения в базе данных в режиме публикации/подписки и обрабатывает их с помощью Spark Streaming, работающего в Cloud Dataproc.
7. Уборка
Удалите все созданные вами ресурсы, чтобы в будущем с вас не взималась плата за них. Самый простой способ избежать оплаты — удалить проект, созданный для этого урока. В качестве альтернативы вы можете удалить отдельные ресурсы.
Выполните следующие команды для удаления отдельных ресурсов.
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
8. Поздравляем!
Поздравляем! Вы только что завершили практическое занятие, демонстрирующее, как создать надежный конвейер обработки данных в реальном времени с использованием Google Cloud Platform. Давайте подведем итоги вашей работы:
- Имитация изменений данных (CDC): Вы изучили основы CDC и реализовали скрипт на Python для имитации изменений в базе данных, генерируя события, которые отражают изменения данных в реальном времени.
- Использование Cloud Pub/Sub: Вы настраиваете темы и подписки Cloud Pub/Sub, предоставляя масштабируемый и надежный сервис обмена сообщениями для потоковой передачи событий CDC. Вы публикуете смоделированные изменения базы данных в Pub/Sub, создавая поток данных в реальном времени.
- Обработка данных с помощью Dataproc и Spark: Вы создали кластер Dataproc и развернули задание Spark Streaming для получения сообщений из вашей подписки Pub/Sub. Вы обрабатывали и преобразовывали входящие события CDC в режиме реального времени, отображая результаты в журналах задания Dataproc.
- Создание сквозного конвейера обработки данных в реальном времени: Вы успешно интегрировали эти сервисы для создания полного конвейера обработки данных, который собирает, передает потоком и обрабатывает изменения данных в реальном времени. Вы получили практический опыт в создании системы, способной обрабатывать непрерывные потоки данных.
- Использован коннектор Spark Pub/Sub: Вы успешно настроили кластер Dataproc для использования коннектора Spark Pub/Sub, что крайне важно для чтения данных из Pub/Sub с помощью Spark Structured Streaming.
Теперь у вас есть прочная основа для построения более сложных и совершенных конвейеров обработки данных для различных приложений, включая аналитику в реальном времени, хранилища данных и микросервисные архитектуры. Продолжайте исследовать и создавать!