この Codelab について
1. はじめに
この Codelab では、段階的な移行アプローチを使用して Apache Kafka から Google Cloud Pub/Sub にアプリケーションを移行する方法について詳しく説明します。
Kafka と Pub/Sub の違いと段階的な移行アプローチについては、こちらをご覧ください。
作成するアプリの概要
このデモでは、次の作業を行います。
- GCE にセルフマネージド Kafka クラスタを設定する
- ランダムな文字列をストリーミングするシンプルな Kafka アプリケーションをデプロイする
- Pub/Sub を設定する
- Pub/Sub Kafka コネクタを使用して Kafka から Pub/Sub に移行する
学習内容
- GCE でセルフマネージド Kafka クラスタをセットアップする方法
- Kafka アプリケーションを Pub/Sub アプリケーションに移行する方法
必要なもの
- Google Cloud Platform にアクセスします(BigQuery と Pub/Sub の書き込み権限あり)。
- gcloud CLI のインストール。
- Java 8 以降がインストールされていること。
費用
このドキュメントでは、次の課金対象のプロダクト/サービスを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。
2. Kafka を設定する
この Codelab では、ZooKeeper を使用して Kafka を開始します。ローカル環境に Java 8 以降がインストールされている必要があります。
1. Kafka をインストールする
Kafka をダウンロードして展開します。バイナリ ダウンロードをおすすめします。
curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1
2. Zookeeper を開始
bin/zookeeper-server-start.sh config/zookeeper.properties
3. ブローカーを開始
Kafka ブローカー サービスを開始するには、別のターミナル セッションを開いて次のコマンドを実行します。
bin/kafka-server-start.sh config/server.properties
4. Kafka トピックを作成
Kafka アプリケーションの Kafka トピックを作成し、新しいターミナル セッションを開いて次のコマンドを実行します。
export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092
5. トピックの作成を確認する
bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092
上記の cmd の出力は次のようになります。
Topic: my-topic TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
3. Kafka アプリケーションを作成する
この Codelab では、1 つのプロデューサーと 2 つのコンシューマーを持つ Java Kafka アプリケーションを作成します。プロデューサーは、ランダムな文字列とタイムスタンプを Kafka トピックに定期的に送信します。
段階的な移行のデモを行うために、このアプリケーション用に 2 つのコンシューマを作成します。
- コンシューマ 1 - 読み取りメッセージを出力する
- コンシューマ 2 - BigQuery にメッセージを書き込みます。
新しいターミナルを開き、次のコマンドを実行します。これらのコマンドは Kafka のダウンロード ディレクトリでは実行しないでください。
1. 定数変数を設定する
export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"
2. Kafka アプリケーションの src をダウンロードする
git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka-to-pubsub-demo
3. gcloud を構成して認証する
gcloud config set project $PROJECT_ID
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com
4. BigQuery テーブルを作成する
このテーブルは、2 番目のコンシューマが出力を書き込むために使用されます。このテーブルのスキーマ定義は「message:STRING, timestamp:STRING」です。
bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID
bq mk --table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING
5. プロデューサーを実行して、トピックへのメッセージ送信を開始します。
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
-Dexec.args="$TOPIC"
出力ログは次のようになります。
...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...
6. トピックのメッセージをコンソールからログアウトする最初のコンシューマを実行する
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
-Dexec.args="$TOPIC"
出力ログは次のようになります。
...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...
7. Kafka トピックから BigQuery テーブルにメッセージを書き込む 2 番目のコンシューマを実行する
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
-Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"
出力ログは次のようになります。
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
8. GCP コンソールでメッセージが BigQuery に正常に書き込まれていることを確認します
4. Pub/Sub の設定
1. Pub/Sub を有効にする
gcloud services enable pubsub.googleapis.com
2. Pub/Sub トピックの作成
このトピックは、最終的に Kafka トピックに代わるものです。わかりやすくするために、Kafka トピックと同じ名前を使用できます。
export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC
5. 段階的な移行
Kafka アプリケーションの設定が完了し、移行用の Pub/Sub トピックが準備できたので、Kafka から Pub/Sub への移行に進みます。
この移行デモでは、Google Cloud Pub/Sub グループの Pub/Sub Kafka コネクタを使用します。これにより、Kafka インフラストラクチャを段階的に移行できます。
フェーズ 1
Kafka トピックから Pub/Sub トピックにすべてのメッセージを転送するように Pub/Sub コネクタを構成する
1. コネクタ リポジトリをビルドして、kafka-to-pubsub コネクタ jar を取得する
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True
成功すると、target/pubsub-group-kafka-connector-${VERSION}.jar
に結果の jar が表示されます。
jar のフルパスを含む変数を作成します。
export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"
2. インストール済みの Kafka 構成を Kafka Connect 構成で更新する
ディレクトリを先ほどの Kafka ダウンロード フォルダに変更します。
cd kafka_2.13-3.5.1
Kafka ダウンロード フォルダの /config/connect-standalone.properties
を開き、ダウンロードしたコネクタ jar のファイルパスを plugin.path に追加し、必要に応じて行をコメント化解除します。または、以下の cmd を実行します。
echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties
3. 移行に必要な Kafka トピック、pubsub プロジェクト、Pub/Sub トピックを含む CloudPubSubSinkConnector
構成ファイルを作成します。 CloudPubSubSinkConnector
構成ファイルのサンプルについては、こちらをご覧ください。
cat <<EOF > config/cps-sink-connector.properties
name=CPSSinkConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=$TOPIC
cps.project=$PROJECT_ID
cps.topic=$TOPIC
EOF
4. コネクタを起動し、Kafka トピックから Pub/Sub へのメッセージの転送を開始します。
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties
GCP コンソールで、メッセージが Pub/Sub トピックに転送されていることを確認する
フェーズ 2
Pub/Sub トピックからメッセージを受信するようにコンシューマ アプリケーションを更新すると、プロデューサーは引き続き Kafka にメッセージをパブリッシュします。
1. コンソールにメッセージを出力するコンシューマを更新して、Pub/Sub にサブスクライブします。サンプルの kafka-to-pubsub-demo
src では、SimplePubsubscriber1
は Pub/Sub トピックから読み取るように更新されます。
Pub/Sub サブスクリプションの作成
export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
更新されたサブスクライバー アプリケーションを実行する
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"
出力ログは次のようになります。
...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...
2. BigQuery に書き込むコンシューマを更新して Pub/Sub に登録する。サンプルの kafka-to-pubsub-demo
src では、SimplePubsubscriber1
は Pub/Sub トピックから読み取るように更新されます。
Pub/Sub サブスクリプションの作成
export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
更新されたサブスクライバー アプリケーションを実行する
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"
出力ログは次のようになります。
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
フェーズ 3
Pub/Sub に直接公開するようにプロデューサーを更新する
- Kafka ではなく Pub/Sub に書き込むように、Kafka プロデューサーの src を更新します。サンプルの
kafka-to-pubsub-demo
src では、Pub/Sub トピックにメッセージを送信するようにSimplePubsubPublisher
が更新されます。 - コネクタを停止します。コネクタを停止するには、kafka-connect ターミナル セッションで実行中のコネクタを終了します。
- 更新されたパブリッシャー アプリケーションを実行する
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
-Dexec.args="$PROJECT_ID $TOPIC"