1. Introduction
This codelab is a step-by-step guide to demonstrate migration of applications from Apache Kafka to Google Cloud Pubsub using the Phased Migration approach.
You can learn more about the differences between Kafka and Pubsub and the Phased Migration approach here.
What you'll build
In this demo, you will:
- Setup a self managed Kafka cluster on GCE
- Deploy a simple Kafka application that streams random strings
- Setup Pub/Sub
- Migrate from Kafka to Pubsub using the Pub/Sub Kafka Connector
What you'll learn
- How to setup a self managed kafka cluster on GCE
- How to migrate a Kafka application to a Pub/Sub application
What you'll need
- Access Google Cloud Platform (with write permissions for BigQuery & Pub/Sub).
- gcloud CLI installed.
- Java 8+ installed.
Cost
In this document, you will use the following billable products/services:
To generate a cost estimate based on your projected usage, use the pricing calculator.
2. Setup Kafka
In this codelab, we'll start Kafka using ZooKeeper. Your local environment must have Java 8+ installed.
1. Install Kafka
Download Kafka and extract it. Recommend the binary download to follow along:
curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1
2. Start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
3. Start broker
To start the Kafka broker service, open another terminal session and run:
bin/kafka-server-start.sh config/server.properties
4. Create kafka topic
Create a Kafka topic for the Kafka application, open a new terminal session and run:
export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092
5. Confirm topic creation
bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092
Output of the above cmd should look similar to below:
Topic: my-topic TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
3. Create a Kafka Application
In this codelab, we will be creating a java kafka application that has 1 producer and 2 consumers. The producer periodically sends random strings and a timestamp to a kafka topic.
To demonstrate Phased migration, we'll create 2 consumers for this application.
- Consumer 1 - Prints out the read messages
- Consumer 2 - Writes the messages into BigQuery
Open a new terminal and run the following commands. Do not run these commands in the Kafka download directory
1. Set Constant variables
export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"
2. Download the Kafka application src
git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka-to-pubsub-demo
3. Configure and authenticate gcloud
gcloud config set project $PROJECT_ID
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com
4. Create a BigQuery table
This table is used by the second consumer to write the output. The schema definition of the table is "message:STRING, timestamp:STRING".
bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID
bq mk --table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING
5. Run the producer to begin sending messages to the topic
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
-Dexec.args="$TOPIC"
Output logs should look similar to:
...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...
6. Run the first consumer that logs out messages in the topic to the console
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
-Dexec.args="$TOPIC"
Output logs should look similar to:
...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...
7. Run the second consumer that writes messages from the kafka topic to a BigQuery table
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
-Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"
Output logs should look similar to:
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
8. Confirm that messages are successfully being written to BigQuery in the GCP console
4. Setup Pubsub
1. Enable Pubsub
gcloud services enable pubsub.googleapis.com
2. Create Pubsub Topic
This topic will eventually replace the kafka topic. For simplicity, we can use the same name as the kafka topic
export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC
5. Phased Migration
Now that we have set up our Kafka application and have a Pub/Sub topic in place for migration, we will proceed with migration from Kafka to Pub/Sub.
In this migration demo, we'll be using the Google Cloud Pub/Sub Group's Pub/Sub Kafka Connector, which lets you migrate your kafka infrastructure in phases.
Phase 1
Configure the Pub/Sub connector to forward all messages from Kafka topic to Pub/Sub topic
1. Acquire the kafka-to-pubsub connector jar by building the connector repo
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True
You should see the resulting jar at target/pubsub-group-kafka-connector-${VERSION}.jar
on success.
Create a variable with the full path to the jar.
export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"
2. Update your installed Kafka configurations with Kafka Connect configurations
Change directory to your kafka download folder from earlier
cd kafka_2.13-3.5.1
Open /config/connect-standalone.properties
in the Kafka download folder and add the filepath of the downloaded connector jar to plugin.path and uncomment the line if needed. Alternatively, you can run the below cmd
echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties
3. Create a CloudPubSubSinkConnector
config file with the kafka topic, pubsub project and pubsub topic needed for the migration. See sample CloudPubSubSinkConnector
config file here.
cat <<EOF > config/cps-sink-connector.properties
name=CPSSinkConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=$TOPIC
cps.project=$PROJECT_ID
cps.topic=$TOPIC
EOF
4. Start the connector to begin forwarding messages from Kafka topic to Pub/Sub
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties
Confirm on the GCP console that messages are being forwarded to your Pub/Sub topic
Phase 2
Update consumer applications to receive messages from Pub/Sub topic, while your producer continues to publish messages to Kafka
1. Update the consumer that prints out messages to the console to subscribe to Pub/Sub. In the sample kafka-to-pubsub-demo
src, SimplePubsubscriber1
is updated to read from the Pubsub topic.
Create a Pub/Sub subscription
export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
Run updated subscriber application
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"
Output logs should look similar to
...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...
2. Update the consumer that writes to BigQuery to subscribe to Pub/Sub. In the sample kafka-to-pubsub-demo
src, SimplePubsubscriber1
is updated to read from the Pubsub topic.
Create a Pub/Sub subscription
export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
Run updated subscriber application
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"
Output logs should look similar to
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
Phase 3
Update your producers to publish directly to Pub/Sub
- Update the Kafka producer src to write to Pub/Sub instead of Kafka. In the sample
kafka-to-pubsub-demo
src,SimplePubsubPublisher
is updated to to send messages to the Pubsub topic. - Stop the connector. You can stop the connector by terminating the running connector in the kafka-connect terminal session
- Run the updated publisher application
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
-Dexec.args="$PROJECT_ID $TOPIC"
6. Congratulations
Congratulations, you've successfully completed the codelab on migrating self-managed Kafka applications to Pub/Sub.
Here are some links for more information