Dataproc 및 Cloud Pub/Sub를 사용하여 Change Data Capture 빌드

Dataproc 및 Cloud Pub/Sub를 사용하여 Change Data Capture 빌드

이 Codelab 정보

subject최종 업데이트: 6월 19, 2025
account_circle작성자: Jatin Narula

1. 소개

df8070bd84336207.png

최종 업데이트: 2025년 6월 19일

변경 데이터 캡처란 무엇인가요?

변경 데이터 캡처 (CDC)는 데이터베이스에서 변경된 데이터를 확인하고 추적하는 데 사용되는 소프트웨어 설계 패턴의 집합입니다. 간단히 말해 데이터에 적용된 변경사항을 캡처하고 기록하여 이러한 변경사항을 다른 시스템에 복제할 수 있는 방법입니다.

변경 데이터 캡처 (CDC)는 데이터 마이그레이션, 실시간 데이터 웨어하우스 및 분석, 재해 복구 및 고가용성, 감사 및 규정 준수와 같은 다양한 데이터 기반 시나리오에서 매우 유용합니다.

데이터 이전

CDC는 점진적인 데이터 전송을 허용하고 다운타임을 줄이며 서비스 중단을 최소화하여 데이터 이전 프로젝트를 간소화합니다.

실시간 데이터 웨어하우징 및 분석

CDC를 사용하면 데이터 웨어하우스와 분석 시스템이 운영 데이터베이스의 최신 변경사항으로 지속적으로 업데이트됩니다.

이를 통해 비즈니스는 실시간 정보를 기반으로 결정을 내릴 수 있습니다.

재해 복구 및 고가용성

CDC를 사용하면 재해 복구를 위해 보조 데이터베이스에 데이터를 실시간으로 복제할 수 있습니다. 장애가 발생하면 CDC를 통해 보조 데이터베이스로 빠르게 장애 조치하여 다운타임과 데이터 손실을 최소화할 수 있습니다.

감사 및 규정 준수

CDC는 규정 준수 및 규제 요구사항에 필수적인 데이터 변경사항에 대한 자세한 감사 추적을 제공합니다.

빌드할 항목

이 Codelab에서는 Cloud Pub/Sub, Dataproc, Python, Apache Spark를 사용하여 변경 데이터 캡처 (CDC) 데이터 파이프라인을 빌드합니다. 파이프라인은 다음을 실행합니다.

  • 데이터베이스 변경사항을 시뮬레이션하고 확장 가능하고 안정적인 메시지 서비스인 Cloud Pub/Sub에 이벤트로 게시합니다.
  • Google Cloud의 관리형 Spark 및 Hadoop 서비스인 Dataproc의 강력한 기능을 활용하여 이러한 이벤트를 실시간으로 처리하세요.

이러한 서비스를 연결하면 데이터 변경사항이 발생할 때 이를 캡처하고 처리할 수 있는 강력한 파이프라인을 만들어 실시간 분석, 데이터 웨어하우스, 기타 중요한 애플리케이션의 기반을 제공할 수 있습니다.

학습할 내용

  • 기본 변경 데이터 캡처 파이프라인을 만드는 방법
  • 스트림 처리를 위한 Dataproc
  • 실시간 메시징을 위한 Cloud Pub/Sub
  • Apache Spark의 기본사항

이 Codelab에서는 Dataproc 및 Cloud Pub/Sub에 중점을 둡니다. 따라서 이와 관련 없는 개념과 코드 블록은 그냥 넘어가겠습니다. 단, 필요할 때 복사해서 붙여넣을 수 있도록 다른 설명 없이 제공만 해드리겠습니다.

필요한 항목

  • 프로젝트가 설정된 활성 GCP 계정 계정이 없는 경우 무료 체험을 신청할 수 있습니다.
  • gcloud CLI가 설치되고 구성되었습니다.
  • 데이터베이스 변경사항을 시뮬레이션하고 Pub/Sub와 상호작용하기 위해 설치된 Python 3.7 이상
  • Dataproc, Cloud Pub/Sub, Apache Spark, Python에 관한 기본 지식

시작하기 전에

터미널에서 다음 명령어를 실행하여 필요한 API를 사용 설정합니다.

gcloud services enable \
    dataproc
.googleapis.com \
    pubsub
.googleapis.com \

2. Cloud Pub/Sub 설정

주제 만들기

이 주제는 데이터베이스 변경사항을 게시하는 데 사용됩니다. Dataproc 작업은 이러한 메시지의 소비자가 되어 변경 데이터 캡처를 위해 메시지를 처리합니다. 이 주제에 관해 자세히 알아보려면 여기에서 공식 문서를 참고하세요.

gcloud pubsub topics create database-changes

정기 결제 만들기

Pub/Sub에서 메시지를 소비하는 데 사용할 구독을 만듭니다. 구독에 관한 자세한 내용은 여기에서 공식 문서를 참고하세요.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. 데이터베이스 변경사항 시뮬레이션

단계

  1. Python 스크립트 (예: simulate_cdc.py)를 사용하여 데이터베이스 변경사항을 시뮬레이션하고 Pub/Sub에 게시합니다.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
   
data_str = json.dumps(data).encode("utf-8")
   
future = publisher.publish(topic_path, data=data_str)
   
print(f"Published message ID: {future.result()}")

def simulate_change():
   
change_types = ["INSERT", "UPDATE", "DELETE"]
   
change_type = random.choice(change_types)
   
record_id = random.randint(1, 100)
   
timestamp = time.time()
   
change_data = {
       
"change_type": change_type,
       
"record_id": record_id,
       
"timestamp": timestamp,
       
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
   
}
   
publish_message(change_data)

if __name__ == "__main__":
   
while True:
       
simulate_change()
       
time.sleep(2)  # Simulate changes every 2 seconds

YOUR_PROJECT_ID를 실제 GCP 프로젝트 ID로 바꿉니다.

  1. Pub/Sub 클라이언트 라이브러리를 설치합니다.
pip install google-cloud-pubsub
  1. 터미널에서 스크립트를 실행합니다. 이 스크립트는 연속으로 실행되며 2초마다 Pub/Sub 주제에 메시지를 게시합니다.
python simulate_cdc.py
  1. 스크립트를 1분 동안 실행하면 Pub/Sub에 소비할 만큼 충분한 메시지가 생성됩니다. OS에 따라 Ctrl + C 또는 Cmd + C를 눌러 실행 중인 Python 스크립트를 종료할 수 있습니다.
  2. 게시된 메시지 보기:

다른 터미널을 열고 다음 명령어를 실행하여 게시된 메시지를 확인합니다.

gcloud pubsub subscriptions pull --auto-ack change-subscriber

메시지와 기타 필드가 포함된 표 행이 표시됩니다.

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

설명

  • Python 스크립트는 INSERT, UPDATE 또는 DELETE 이벤트를 무작위로 생성하여 데이터베이스 변경사항을 시뮬레이션합니다.
  • 각 변경사항은 변경 유형, 레코드 ID, 타임스탬프, 데이터가 포함된 JSON 객체로 표시됩니다.
  • 이 스크립트는 Cloud Pub/Sub 클라이언트 라이브러리를 사용하여 이러한 변경 이벤트를 database-changes 주제에 게시합니다.
  • subscriber 명령어를 사용하면 pub/sub 주제에 전송되는 메시지를 볼 수 있습니다.

4. Dataproc용 서비스 계정 만들기

이 섹션에서는 Dataproc 클러스터가 사용할 수 있는 서비스 계정을 만듭니다. 또한 클러스터 인스턴스가 Cloud Pub/Sub 및 Dataproc에 액세스하는 데 필요한 권한을 할당합니다.

  1. 서비스 계정을 만듭니다.
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. 서비스 계정이 클러스터를 만들고 작업을 실행할 수 있도록 Dataproc 작업자 역할을 추가합니다. 이전 명령어에서 생성된 서비스 계정 ID를 아래 명령어의 구성원으로 추가합니다.
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. 서비스 계정이 'change-subscriber' Pub/Sub 구독을 구독할 수 있도록 Pub/Sub 구독자 역할을 추가합니다.
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change
-subscriber \
       
--role roles/pubsub.subscriber \
       
--member="serviceAccount:<your-service-account-with-domain"

5. Dataproc 클러스터 만들기

Dataproc 클러스터는 Pub/Sub의 메시지를 처리하는 Spark 앱을 실행합니다. 이전 섹션에서 만든 서비스 계정이 필요합니다. Dataproc은 클러스터의 모든 인스턴스에 이 서비스 계정을 할당하여 앱을 실행하는 데 필요한 권한을 부여합니다.

다음 명령어를 사용하여 Dataproc 클러스터를 만듭니다.

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Dataproc 클러스터에 Spark 작업 제출

Spark 스트리밍 앱은 Pub/Sub에서 데이터베이스 변경 메시지를 처리하고 콘솔에 출력합니다.

단계

  1. 디렉터리를 만들고 PubsubConsumer.scala 파일에 소비자의 소스 코드를 추가합니다.
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc
-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

 
def createContext(projectID: String, checkpointDirectory: String)
   
: StreamingContext = {

   
// [START stream_setup]
   
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
   
val ssc = new StreamingContext(sparkConf, Seconds(1))

   
// Set the checkpoint directory
   
val yarnTags = sparkConf.get("spark.yarn.tags")
   
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
   
ssc.checkpoint(checkpointDirectory + '/' + jobId)
   
   
// Create stream
   
val messagesStream: DStream[String] = PubsubUtils
     
.createStream(
       
ssc,
       
projectID,
       
None,
       
"change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
       
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
     
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
   
// [END stream_setup]

   
processStringDStream(messagesStream)
   
       
ssc
 
}

 
def processStringDStream(stringDStream: DStream[String]): Unit = {
   
stringDStream.foreachRDD { rdd =>
     
if (!rdd.isEmpty()) {
       
val listOfStrings: List[String] = rdd.collect().toList
       
listOfStrings.foreach(str => println(s"message received: $str"))
     
} else {
       
println("looking for message...")
     
}
   
}
 
}

 
def main(args: Array[String]): Unit = {
   
if (args.length != 2) {
     
System.err.println("arguments are not passed correctly!")
     
System.exit(1)
   
}

   
val Seq(projectID, checkpointDirectory) = args.toSeq

   
// Create Spark context
   
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
     
() => createContext(projectID, checkpointDirectory))

   
// Start streaming until we receive an explicit termination
   
ssc.start()
   
ssc.awaitTermination()
 
}

}
  1. 다음을 만들고 pom.xml에 추가합니다.
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. 프로젝트의 Spark 디렉터리로 변경하고 나중에 사용할 수 있도록 경로를 환경 변수에 저장합니다.
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. 디렉터리를 변경합니다.
cd $REPO_ROOT/spark
  1. Java 1.8을 다운로드하고 폴더를 /usr/lib/jvm/에 배치합니다. 그런 다음 JAVA_HOME을 다음을 가리키도록 변경합니다.
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. 애플리케이션 jar 빌드
mvn clean package

애플리케이션 코드 및 종속 항목이 포함된 spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar 보관 파일이 spark/target 디렉터리에 생성됩니다.

  1. Spark 애플리케이션을 제출합니다.
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. 활성 작업 목록을 표시하고 작업의 JOB_ID 값을 기록해 둡니다.
gcloud dataproc jobs list --region=us-central1 --state-filter=active

출력은 다음과 유사합니다.

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. 브라우저에서 다음 URL을 열어 작업 출력을 확인합니다. [JOB_ID] 를 이전 단계에서 기록한 값으로 바꿉니다.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. 출력은 다음과 비슷합니다.
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

Dataproc에서 실행되는 Spark 스트리밍 작업은 Pub/Sub에서 메시지를 가져와 처리하고 콘솔에 출력을 표시합니다.

  1. 작업 종료: 다음 명령어를 실행하여 작업을 종료합니다. JOB_ID를 앞에서 기록한 것과 동일한 것으로 바꿉니다.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

축하합니다. Pub/Sub에서 데이터베이스 변경사항을 캡처하고 Cloud Dataproc에서 실행되는 Spark 스트리밍을 사용하여 처리하는 강력한 CDC 파이프라인을 만들었습니다.

7. 삭제

이후에 요금이 청구되지 않도록 생성한 리소스를 모두 삭제합니다. 비용이 청구되지 않도록 하는 가장 쉬운 방법은 가이드에서 만든 프로젝트를 삭제하는 것입니다. 개별 리소스를 삭제할 수도 있습니다.

다음 명령어를 실행하여 개별 리소스를 삭제합니다.

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics
delete database-changes --quiet
gcloud pubsub subscriptions
delete change-subscriber --quiet
gcloud iam service
-accounts delete <your-service-account-with-domain> --quiet

8. 축하합니다

축하합니다. Google Cloud Platform을 사용하여 강력한 실시간 데이터 파이프라인을 빌드하는 방법을 보여주는 실습 Codelab을 완료하셨습니다. 지금까지 달성한 내용을 요약해 보겠습니다.

  • 시뮬레이션된 변경 데이터 캡처 (CDC): CDC의 기본사항을 학습하고 Python 스크립트를 구현하여 데이터베이스 변경을 시뮬레이션하고 실시간 데이터 수정을 나타내는 이벤트를 생성했습니다.
  • Cloud Pub/Sub 활용: Cloud Pub/Sub 주제와 구독을 설정하여 CDC 이벤트를 스트리밍하기 위한 확장 가능하고 안정적인 메시징 서비스를 제공합니다. 시뮬레이션된 데이터베이스 변경사항을 Pub/Sub에 게시하여 실시간 데이터 스트림을 만들었습니다.
  • Dataproc 및 Spark로 처리된 데이터: Dataproc 클러스터를 프로비저닝하고 Pub/Sub 구독에서 메시지를 소비하도록 Spark 스트리밍 작업을 배포했습니다. 수신되는 CDC 이벤트를 실시간으로 처리 및 변환하여 Dataproc 작업 로그에 결과를 표시했습니다.
  • 엔드 투 엔드 실시간 파이프라인 구축: 이러한 서비스를 통합하여 실시간으로 데이터 변경사항을 캡처, 스트리밍, 처리하는 완전한 데이터 파이프라인을 만들었습니다. 연속 데이터 스트림을 처리할 수 있는 시스템을 빌드하는 실무 경험을 얻었습니다.
  • Spark Pub/Sub 커넥터 사용: Spark Structured Streaming이 Pub/Sub에서 데이터를 읽는 데 중요한 Spark Pub/Sub 커넥터를 사용하도록 Dataproc 클러스터를 구성했습니다.

이제 실시간 분석, 데이터 웨어하우스, 마이크로서비스 아키텍처를 비롯한 다양한 애플리케이션을 위한 더 복잡하고 정교한 데이터 파이프라인을 빌드할 수 있는 탄탄한 기반을 갖추게 되었습니다. 계속 탐색하고 빌드하세요.

참조 문서