Apache Kafka から Pub/Sub への移行

Apache Kafka から Pub/Sub への移行

この Codelab について

subject最終更新: 10月 4, 2023
account_circle作成者: Timothy Itodo & Sri Harshini Donthineni

1. はじめに

この Codelab では、段階的な移行アプローチを使用して Apache Kafka から Google Cloud Pub/Sub にアプリケーションを移行する方法について詳しく説明します。

Kafka と Pub/Sub の違いと段階的な移行アプローチについては、こちらをご覧ください。

このデモでは、次の作業を行います。

  • GCE にセルフマネージド Kafka クラスタを設定する
  • ランダムな文字列をストリーミングするシンプルな Kafka アプリケーションをデプロイする
  • Pub/Sub を設定する
  • Pub/Sub Kafka コネクタを使用して Kafka から Pub/Sub に移行する

学習内容

  • GCE でセルフマネージド Kafka クラスタをセットアップする方法
  • Kafka アプリケーションを Pub/Sub アプリケーションに移行する方法

必要なもの

  • Google Cloud Platform にアクセスします(BigQuery と Pub/Sub の書き込み権限あり)。
  • gcloud CLI のインストール。
  • Java 8 以降がインストールされていること。

費用

このドキュメントでは、次の課金対象のプロダクト/サービスを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。

2. Kafka を設定する

この Codelab では、ZooKeeper を使用して Kafka を開始します。ローカル環境に Java 8 以降がインストールされている必要があります。

1. Kafka をインストールする

Kafka をダウンロードして展開します。バイナリ ダウンロードをおすすめします。

curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar
-xzf kafka_2.13-3.5.1.tgz
cd kafka_2
.13-3.5.1

2. Zookeeper を開始

bin/zookeeper-server-start.sh config/zookeeper.properties

3. ブローカーを開始

Kafka ブローカー サービスを開始するには、別のターミナル セッションを開いて次のコマンドを実行します。

bin/kafka-server-start.sh config/server.properties

4. Kafka トピックを作成

Kafka アプリケーションの Kafka トピックを作成し、新しいターミナル セッションを開いて次のコマンドを実行します。

export TOPIC= "my-topic"
bin
/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092

5. トピックの作成を確認する

bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092

上記の cmd の出力は次のようになります。

Topic: my-topic   TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1   ReplicationFactor: 1    Configs:
 
Topic: my-topic Partition: 0    Leader: 0   Replicas: 0 Isr: 0

3. Kafka アプリケーションを作成する

この Codelab では、1 つのプロデューサーと 2 つのコンシューマーを持つ Java Kafka アプリケーションを作成します。プロデューサーは、ランダムな文字列とタイムスタンプを Kafka トピックに定期的に送信します。

段階的な移行のデモを行うために、このアプリケーション用に 2 つのコンシューマを作成します。

  • コンシューマ 1 - 読み取りメッセージを出力する
  • コンシューマ 2 - BigQuery にメッセージを書き込みます。

新しいターミナルを開き、次のコマンドを実行します。これらのコマンドは Kafka のダウンロード ディレクトリでは実行しないでください。

1. 定数変数を設定する

export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"

2. Kafka アプリケーションの src をダウンロードする

git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka
-to-pubsub-demo

3. gcloud を構成して認証する

gcloud config set project $PROJECT_ID
gcloud auth application
-default login
gcloud services enable bigquery
.googleapis.com

4. BigQuery テーブルを作成する

このテーブルは、2 番目のコンシューマが出力を書き込むために使用されます。このテーブルのスキーマ定義は「message:STRING, timestamp:STRING」です。

bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID 
bq mk
--table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING

5. プロデューサーを実行して、トピックへのメッセージ送信を開始します。

mvn clean install
mvn
exec:java \
 
-Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
 
-Dexec.args="$TOPIC"

出力ログは次のようになります。

...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...

6. トピックのメッセージをコンソールからログアウトする最初のコンシューマを実行する

mvn clean install
mvn
exec:java \
 
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
 
-Dexec.args="$TOPIC"

出力ログは次のようになります。

...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...

7. Kafka トピックから BigQuery テーブルにメッセージを書き込む 2 番目のコンシューマを実行する

mvn clean install
mvn
exec:java \
 
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
 
-Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"

出力ログは次のようになります。

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

8. GCP コンソールでメッセージが BigQuery に正常に書き込まれていることを確認します

8734b356c59543af.png

4. Pub/Sub の設定

1. Pub/Sub を有効にする

gcloud services enable pubsub.googleapis.com

2. Pub/Sub トピックの作成

このトピックは、最終的に Kafka トピックに代わるものです。わかりやすくするために、Kafka トピックと同じ名前を使用できます。

export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC

5. 段階的な移行

Kafka アプリケーションの設定が完了し、移行用の Pub/Sub トピックが準備できたので、Kafka から Pub/Sub への移行に進みます。

この移行デモでは、Google Cloud Pub/Sub グループの Pub/Sub Kafka コネクタを使用します。これにより、Kafka インフラストラクチャを段階的に移行できます。

フェーズ 1

Kafka トピックから Pub/Sub トピックにすべてのメッセージを転送するように Pub/Sub コネクタを構成する

1. コネクタ リポジトリをビルドして、kafka-to-pubsub コネクタ jar を取得する

git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java
-pubsub-group-kafka-connector/
mvn clean
package -DskipTests=True

成功すると、target/pubsub-group-kafka-connector-${VERSION}.jar に結果の jar が表示されます。

jar のフルパスを含む変数を作成します。

export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"

2. インストール済みの Kafka 構成を Kafka Connect 構成で更新する

ディレクトリを先ほどの Kafka ダウンロード フォルダに変更します。

cd kafka_2.13-3.5.1

Kafka ダウンロード フォルダの /config/connect-standalone.properties を開き、ダウンロードしたコネクタ jar のファイルパスを plugin.path に追加し、必要に応じて行をコメント化解除します。または、以下の cmd を実行します。

echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties

3. 移行に必要な Kafka トピック、pubsub プロジェクト、Pub/Sub トピックを含む CloudPubSubSinkConnector 構成ファイルを作成します。 CloudPubSubSinkConnector 構成ファイルのサンプルについては、こちらをご覧ください。

cat <<EOF > config/cps-sink-connector.properties
name
=CPSSinkConnector
connector
.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks
.max=10
key
.converter=org.apache.kafka.connect.storage.StringConverter
value
.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics
=$TOPIC
cps
.project=$PROJECT_ID
cps
.topic=$TOPIC
EOF

4. コネクタを起動し、Kafka トピックから Pub/Sub へのメッセージの転送を開始します。

bin/connect-standalone.sh \
config
/connect-standalone.properties \
config
/cps-sink-connector.properties

GCP コンソールで、メッセージが Pub/Sub トピックに転送されていることを確認する

フェーズ 2

Pub/Sub トピックからメッセージを受信するようにコンシューマ アプリケーションを更新すると、プロデューサーは引き続き Kafka にメッセージをパブリッシュします。

1. コンソールにメッセージを出力するコンシューマを更新して、Pub/Sub にサブスクライブします。サンプルの kafka-to-pubsub-demo src では、SimplePubsubscriber1 は Pub/Sub トピックから読み取るように更新されます。

Pub/Sub サブスクリプションの作成

export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID
--topic=$TOPIC

更新されたサブスクライバー アプリケーションを実行する

cd kafka-to-pubsub-demo
mvn
exec:java \
 
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
 
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"

出力ログは次のようになります。

...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...

2. BigQuery に書き込むコンシューマを更新して Pub/Sub に登録する。サンプルの kafka-to-pubsub-demo src では、SimplePubsubscriber1 は Pub/Sub トピックから読み取るように更新されます。

Pub/Sub サブスクリプションの作成

export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID
--topic=$TOPIC

更新されたサブスクライバー アプリケーションを実行する

cd kafka-to-pubsub-demo
mvn
exec:java \
 
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
 
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"

出力ログは次のようになります。

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

フェーズ 3

Pub/Sub に直接公開するようにプロデューサーを更新する

  1. Kafka ではなく Pub/Sub に書き込むように、Kafka プロデューサーの src を更新します。サンプルの kafka-to-pubsub-demo src では、Pub/Sub トピックにメッセージを送信するように SimplePubsubPublisher が更新されます。
  2. コネクタを停止します。コネクタを停止するには、kafka-connect ターミナル セッションで実行中のコネクタを終了します。
  3. 更新されたパブリッシャー アプリケーションを実行する
cd kafka-to-pubsub-demo
mvn
exec:java \
 
-Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
 
-Dexec.args="$PROJECT_ID $TOPIC"

6. 完了

これで、セルフマネージド Kafka アプリケーションを Pub/Sub に移行する Codelab は終了です。

詳細については、次のリンクをご覧ください。