Dataproc と Cloud Pub/Sub を使用して Change Data Capture を作成する

Dataproc と Cloud Pub/Sub を使用して Change Data Capture を作成する

この Codelab について

subject最終更新: 6月 19, 2025
account_circle作成者: Jatin Narula

1. はじめに

df8070bd84336207.png

最終更新日: 2025 年 6 月 19 日

変更データ キャプチャとは

変更データ キャプチャ(CDC)は、データベースで変更されたデータを特定して追跡するために使用される一連のソフトウェア設計パターンです。簡単に説明すると、データに加えられた変更をキャプチャして記録し、それらの変更を他のシステムに複製できるようにする方法です。

変更データ キャプチャ(CDC)は、データ移行、リアルタイム データ ウェアハウジングと分析、障害復旧と高可用性、監査とコンプライアンスなど、データドリブンのさまざまなシナリオで非常に役立ちます。

データの移行

CDC は、増分データ転送を可能にしてダウンタイムを短縮し、中断を最小限に抑えることで、データ移行プロジェクトを簡素化します。

リアルタイム データ ウェアハウジングと分析

CDC により、データ ウェアハウスと分析システムがオペレーショナル データベースの最新の変更で常に更新されるようにします。

これにより、企業はリアルタイムの情報に基づいて意思決定を行うことができます。

障害復旧と高可用性

CDC を使用すると、障害復旧のためにセカンダリ データベースにデータをリアルタイムでレプリケートできます。障害が発生した場合、CDC によりセカンダリ データベースへの迅速なフェイルオーバーが可能になり、ダウンタイムとデータ損失を最小限に抑えることができます。

監査とコンプライアンス

CDC は、コンプライアンスと規制要件に不可欠な、データ変更の詳細な監査証跡を提供します。

作成するアプリの概要

この Codelab では、Cloud Pub/Sub、Dataproc、Python、Apache Spark を使用して、変更データ キャプチャ(CDC)データ パイプラインを構築します。パイプラインは次の処理を行います。

  • データベースの変更をシミュレートし、スケーラブルで信頼性の高いメッセージング サービスである Cloud Pub/Sub にイベントとしてパブリッシュします。
  • Google Cloud のマネージド Spark と Hadoop サービスである Dataproc を活用して、これらのイベントをリアルタイムで処理します。

これらのサービスを接続することで、データ変更が発生したときにキャプチャして処理できる堅牢なパイプラインを作成し、リアルタイム分析、データ ウェアハウジング、その他の重要なアプリケーションの基盤を構築できます。

学習内容

  • 基本的な変更データ キャプチャ パイプラインを作成する方法
  • ストリーム処理用の Dataproc
  • リアルタイム メッセージング用の Cloud Pub/Sub
  • Apache Spark の基本

この Codelab では、Dataproc と Cloud Pub/Sub を中心に説明します。関連のない概念やコードブロックについては詳しく触れず、コードはコピーして貼るだけの状態で提供されています。

必要なもの

  • プロジェクトが設定された有効な GCP アカウント。アカウントをお持ちでない場合は、無料トライアルに登録できます。
  • gcloud CLI がインストールされ、構成されている。
  • データベースの変更をシミュレートし、Pub/Sub を操作するために Python 3.7 以降がインストールされている。
  • Dataproc、Cloud Pub/Sub、Apache Spark、Python に関する基本的な知識。

始める前に

ターミナルで次のコマンドを実行して、必要な API を有効にします。

gcloud services enable \
    dataproc
.googleapis.com \
    pubsub
.googleapis.com \

2. Cloud Pub/Sub をセットアップする

トピックを作成する

このトピックは、データベースの変更を公開するために使用されます。Dataproc ジョブはこれらのメッセージの使用者であり、変更データ キャプチャのためにメッセージを処理します。トピックについて詳しくは、こちらの公式ドキュメントをご覧ください。

gcloud pubsub topics create database-changes

サブスクリプションを作成する

Pub/Sub でメッセージを使用するためにサブスクリプションを作成します。サブスクリプションの詳細については、こちらの公式ドキュメントをご覧ください。

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. データベースの変更をシミュレートする

手順

  1. Python スクリプトを作成します(simulate_cdc.py)を使用して、データベースの変更をシミュレートし、Pub/Sub にパブリッシュします。
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
   
data_str = json.dumps(data).encode("utf-8")
   
future = publisher.publish(topic_path, data=data_str)
   
print(f"Published message ID: {future.result()}")

def simulate_change():
   
change_types = ["INSERT", "UPDATE", "DELETE"]
   
change_type = random.choice(change_types)
   
record_id = random.randint(1, 100)
   
timestamp = time.time()
   
change_data = {
       
"change_type": change_type,
       
"record_id": record_id,
       
"timestamp": timestamp,
       
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
   
}
   
publish_message(change_data)

if __name__ == "__main__":
   
while True:
       
simulate_change()
       
time.sleep(2)  # Simulate changes every 2 seconds

YOUR_PROJECT_ID は実際の GCP プロジェクト ID に置き換えてください

  1. Pub/Sub クライアント ライブラリをインストールします。
pip install google-cloud-pubsub
  1. ターミナルでスクリプトを実行します。このスクリプトは継続的に実行され、2 秒ごとに Pub/Sub トピックにメッセージをパブリッシュします。
python simulate_cdc.py
  1. スクリプトを 1 分間実行すると、Pub/Sub に使用できる十分なメッセージが送信されます。実行中の Python スクリプトを終了するには、OS に応じて Ctrl+C または Cmd+C キーを押します。
  2. パブリッシュされたメッセージを表示する:

別のターミナルを開き、次のコマンドを実行して公開されたメッセージを表示します。

gcloud pubsub subscriptions pull --auto-ack change-subscriber

メッセージとその他のフィールドを含むテーブルの行が表示されます。

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

説明

  • Python スクリプトは、INSERT イベント、UPDATE イベント、または DELETE イベントをランダムに生成して、データベースの変更をシミュレートします。
  • 各変更は、変更タイプ、レコード ID、タイムスタンプ、データを含む JSON オブジェクトとして表されます。
  • このスクリプトは、Cloud Pub/Sub クライアント ライブラリを使用して、これらの変更イベントを database-changes トピックにパブリッシュします。
  • subscriber コマンドを使用すると、Pub/Sub トピックに送信されているメッセージを表示できます。

4. Dataproc のサービス アカウントを作成する

このセクションでは、Dataproc クラスタが使用できるサービス アカウントを作成します。また、クラスタ インスタンスが Cloud Pub/Sub と Dataproc にアクセスするために必要な権限を割り当てます。

  1. サービス アカウントの作成:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Dataproc ワーカーロールを追加して、サービス アカウントがクラスタを作成してジョブを実行できるようにします。前のコマンドで生成されたサービス アカウント ID を、次のコマンドのメンバとして追加します。
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Pub/Sub サブスクライバーのロールを追加して、サービス アカウントが「change-subscriber」Pub/Sub サブスクリプションに登録できるようにします。
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change
-subscriber \
       
--role roles/pubsub.subscriber \
       
--member="serviceAccount:<your-service-account-with-domain"

5. Dataproc クラスタを作成する

Dataproc クラスタは、Pub/Sub 内のメッセージを処理する Spark アプリを実行します。前のセクションで作成したサービス アカウントが必要です。Dataproc はこのサービス アカウントをクラスタ内のすべてのインスタンスに割り当て、すべてのインスタンスがアプリを実行するための適切な権限を取得できるようにします。

次のコマンドを使用して Dataproc クラスタを作成します。

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Dataproc クラスタに Spark ジョブを送信する

Spark ストリーミング アプリは、Pub/Sub でデータベース変更メッセージを処理し、コンソールに出力します。

手順

  1. ディレクトリを作成し、コンシューマのソースコードを PubsubConsumer.scala ファイルに追加します。
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc
-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

 
def createContext(projectID: String, checkpointDirectory: String)
   
: StreamingContext = {

   
// [START stream_setup]
   
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
   
val ssc = new StreamingContext(sparkConf, Seconds(1))

   
// Set the checkpoint directory
   
val yarnTags = sparkConf.get("spark.yarn.tags")
   
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
   
ssc.checkpoint(checkpointDirectory + '/' + jobId)
   
   
// Create stream
   
val messagesStream: DStream[String] = PubsubUtils
     
.createStream(
       
ssc,
       
projectID,
       
None,
       
"change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
       
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
     
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
   
// [END stream_setup]

   
processStringDStream(messagesStream)
   
       
ssc
 
}

 
def processStringDStream(stringDStream: DStream[String]): Unit = {
   
stringDStream.foreachRDD { rdd =>
     
if (!rdd.isEmpty()) {
       
val listOfStrings: List[String] = rdd.collect().toList
       
listOfStrings.foreach(str => println(s"message received: $str"))
     
} else {
       
println("looking for message...")
     
}
   
}
 
}

 
def main(args: Array[String]): Unit = {
   
if (args.length != 2) {
     
System.err.println("arguments are not passed correctly!")
     
System.exit(1)
   
}

   
val Seq(projectID, checkpointDirectory) = args.toSeq

   
// Create Spark context
   
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
     
() => createContext(projectID, checkpointDirectory))

   
// Start streaming until we receive an explicit termination
   
ssc.start()
   
ssc.awaitTermination()
 
}

}
  1. 以下を作成して pom.xml に追加します。
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. プロジェクトの spark ディレクトリに移動し、後で使用できるようにパスを環境変数に保存します。
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. ディレクトリを変更します。
cd $REPO_ROOT/spark
  1. Java 1.8 をダウンロードし、フォルダを /usr/lib/jvm/ に配置します。次に、JAVA_HOME を変更して、次を指すようにします。
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. アプリケーション jar をビルドする
mvn clean package

アプリケーション コードと依存関係を含む spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar アーカイブが spark/target ディレクトリに作成されます。

  1. Spark アプリケーションを送信します。
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. アクティブなジョブのリストを表示して、該当するジョブの JOB_ID 値を記録します。
gcloud dataproc jobs list --region=us-central1 --state-filter=active

出力は次のようになります。

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. ジョブの出力を表示するために、ブラウザで次の URL を開きます。[JOB_ID] は、前の手順でメモした値に置き換えます。
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. 出力は次のようになります。
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

Dataproc で実行されている Spark ストリーミング ジョブは、Pub/Sub からメッセージを取得し、処理してコンソールに出力を表示します。

  1. ジョブの終了: 次のコマンドを実行してジョブを終了します。JOB_ID は、先ほどメモした同じ ID に置き換えます。
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

おめでとうございます。Pub/Sub でデータベースの変更をキャプチャし、Cloud Dataproc で実行されている Spark ストリーミングを使用して処理する強力な CDC パイプラインを作成しました。

7. クリーンアップ

作成したすべてのリソースをクリーンアップして、これ以降リソースの料金が発生しないようにします。課金を停止する最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。あるいは、リソースを個別に削除することもできます。

次のコマンドを実行して、個々のリソースを削除します。

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics
delete database-changes --quiet
gcloud pubsub subscriptions
delete change-subscriber --quiet
gcloud iam service
-accounts delete <your-service-account-with-domain> --quiet

8. 完了

これで、Google Cloud Platform を使用して堅牢なリアルタイム データ パイプラインを構築する方法を紹介するハンズオン Codelab は終了です。ここまで学んだことを確認しましょう。

  • 変更データ キャプチャ(CDC)のシミュレーション: CDC の基本を学び、Python スクリプトを実装してデータベースの変更をシミュレートし、リアルタイムのデータ変更を表すイベントを生成しました。
  • Cloud Pub/Sub を活用: Cloud Pub/Sub トピックとサブスクリプションを設定し、CDC イベントのストリーミングにスケーラブルで信頼性の高いメッセージング サービスを提供します。シミュレートされたデータベース変更を Pub/Sub にパブリッシュし、リアルタイム データ ストリームを作成しました。
  • Dataproc と Spark で処理されたデータ: Dataproc クラスタをプロビジョニングし、Spark Streaming ジョブをデプロイして Pub/Sub サブスクリプションからメッセージを使用しました。受信した CDC イベントをリアルタイムで処理して変換し、結果を Dataproc ジョブログに表示しました。
  • エンドツーエンドのリアルタイム パイプラインを構築しました。これらのサービスを正常に統合し、データ変更をリアルタイムでキャプチャ、ストリーミング、処理する完全なデータ パイプラインを作成しました。連続データ ストリームを処理できるシステムの構築に関する実践的な経験を積みました。
  • Spark Pub/Sub コネクタを使用しました。Spark Pub/Sub コネクタを使用するように Dataproc クラスタを正常に構成しました。これは、Spark Structured Streaming が Pub/Sub からデータを読み取るために重要です。

これで、リアルタイム分析、データ ウェアハウジング、マイクロサービス アーキテクチャなど、さまざまなアプリケーション向けに、より複雑で高度なデータ パイプラインを構築するための堅固な基盤ができました。引き続き探索と構築を続けてください。

リファレンス ドキュメント