1. परिचय
इस कोडलैब में, फ़ेज़ के हिसाब से माइग्रेशन के तरीके का इस्तेमाल करके, ऐप्लिकेशन को Apache Kafka से Google Cloud Pub/Sub पर माइग्रेट करने के बारे में सिलसिलेवार तरीके से बताया गया है.
Kafka और Pubsub के बीच के अंतर और फ़ेज़ के हिसाब से माइग्रेट करने के तरीके के बारे में ज़्यादा जानने के लिए, यहां जाएं.
आपको क्या बनाना है
इस डेमो में, आपको यह जानकारी मिलेगी:
- GCE पर, खुद मैनेज किया जाने वाला Kafka क्लस्टर सेट अप करना
- एक सामान्य Kafka ऐप्लिकेशन डिप्लॉय करें, जो रैंडम स्ट्रिंग स्ट्रीम करता है
- Pub/Sub सेट अप करना
- Pub/Sub Kafka Connector का इस्तेमाल करके, Kafka से Pubsub पर माइग्रेट करना
आपको क्या सीखने को मिलेगा
- GCE पर, खुद मैनेज किए जाने वाले Kafka क्लस्टर को सेट अप करने का तरीका
- Kafka ऐप्लिकेशन को Pub/Sub ऐप्लिकेशन पर माइग्रेट करने का तरीका
आपको किन चीज़ों की ज़रूरत होगी
- Google Cloud Platform का ऐक्सेस (BigQuery और Pub/Sub के लिए, फ़ाइल या एंट्री में बदलाव करने की अनुमतियों के साथ).
- gcloud सीएलआई इंस्टॉल हो गया हो.
- Java 8 या उसके बाद का वर्शन इंस्टॉल हो.
लागत
इस दस्तावेज़ में, बिल किए जाने वाले इन प्रॉडक्ट/सेवाओं का इस्तेमाल किया जाएगा:
अपने अनुमानित इस्तेमाल के आधार पर लागत का अनुमान जनरेट करने के लिए, प्राइसिंग कैलकुलेटर का इस्तेमाल करें.
2. Kafka सेट अप करना
इस कोडलैब में, हम ZooKeeper का इस्तेमाल करके Kafka शुरू करेंगे. आपके लोकल एनवायरमेंट में Java 8 या इसके बाद का वर्शन इंस्टॉल होना चाहिए.
1. Kafka इंस्टॉल करना
Kafka डाउनलोड करें और उसे एक्सट्रैक्ट करें. साथ-साथ काम करने के लिए, बाइनरी डाउनलोड करने का सुझाव दें:
curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1
2. ज़ूकीपर शुरू करें
bin/zookeeper-server-start.sh config/zookeeper.properties
3. ब्रोकर शुरू करें
Kafka ब्रोकर सेवा शुरू करने के लिए, कोई दूसरा टर्मिनल सेशन खोलें और यह कमांड चलाएं:
bin/kafka-server-start.sh config/server.properties
4. Kafka विषय बनाना
Kafka ऐप्लिकेशन के लिए Kafka विषय बनाएं. इसके बाद, नया टर्मिनल सेशन खोलें और यह कमांड चलाएं:
export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092
5. विषय बनाने की पुष्टि करें
bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092
ऊपर दिए गए cmd का आउटपुट, यहां दिए गए आउटपुट जैसा दिखना चाहिए:
Topic: my-topic TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
3. Kafka ऐप्लिकेशन बनाना
इस कोडलैब में, हम एक Java Kafka ऐप्लिकेशन बनाएंगे. इसमें एक प्रोड्यूसर और दो कंज्यूमर होंगे. प्रोड्यूसर, समय-समय पर काफ़्का विषय को रैंडम स्ट्रिंग और टाइमस्टैंप भेजता है.
फ़ेज़ के हिसाब से माइग्रेट करने की प्रोसेस दिखाने के लिए, हम इस ऐप्लिकेशन के लिए दो उपभोक्ता खाते बनाएंगे.
- उपयोगकर्ता 1 - पढ़े गए मैसेज प्रिंट करता है
- उपयोगकर्ता 2 - BigQuery में मैसेज लिखता है
एक नया टर्मिनल खोलें और ये कमांड चलाएं. इन कमांड को Kafka की डाउनलोड डायरेक्ट्री में न चलाएं
1. कॉन्स्टेंट वैरिएबल सेट करना
export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"
2. Kafka ऐप्लिकेशन src डाउनलोड करें
git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka-to-pubsub-demo
3. gcloud को कॉन्फ़िगर करना और उसकी पुष्टि करना
gcloud config set project $PROJECT_ID
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com
4. BigQuery टेबल बनाना
इस टेबल का इस्तेमाल दूसरा उपभोक्ता, आउटपुट लिखने के लिए करता है. टेबल का स्कीमा डेफ़िनिशन "message:STRING, timestamp:STRING" है.
bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID
bq mk --table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING
5. विषय पर मैसेज भेजना शुरू करने के लिए, प्रोड्यूसर को चलाएं
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
-Dexec.args="$TOPIC"
आउटपुट लॉग कुछ इस तरह दिखने चाहिए:
...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...
6. उस पहले उपभोक्ता को चलाएं जो विषय में लॉग आउट किए गए मैसेज को कंसोल में भेजता है
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
-Dexec.args="$TOPIC"
आउटपुट लॉग कुछ इस तरह दिखने चाहिए:
...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...
7. दूसरे उपभोक्ता को चलाएं, जो Kafka विषय से BigQuery टेबल में मैसेज लिखता है
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
-Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"
आउटपुट लॉग कुछ इस तरह दिखने चाहिए:
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
8. पुष्टि करें कि GCP कंसोल में, BigQuery में मैसेज लिखे जा रहे हैं

4. Pubsub सेट अप करना
1. Pubsub चालू करना
gcloud services enable pubsub.googleapis.com
2. Pubsub विषय बनाना
यह विषय, Kafka विषय की जगह ले लेगा. आसानी के लिए, हम काफ़्का विषय के तौर पर उसी नाम का इस्तेमाल कर सकते हैं
export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC
5. फ़ेज़ के हिसाब से माइग्रेशन
अब हमने अपना Kafka ऐप्लिकेशन सेट अप कर लिया है और माइग्रेशन के लिए Pub/Sub विषय बना लिया है. अब हम Kafka से Pub/Sub पर माइग्रेट करेंगे.
माइग्रेशन के इस डेमो में, हम Google Cloud Pub/Sub ग्रुप के Pub/Sub Kafka Connector का इस्तेमाल करेंगे. इसकी मदद से, अपने Kafka इन्फ़्रास्ट्रक्चर को चरणों में माइग्रेट किया जा सकता है.
पहला चरण
Pub/Sub कनेक्टर को कॉन्फ़िगर करें, ताकि Kafka विषय से Pub/Sub विषय पर सभी मैसेज फ़ॉरवर्ड किए जा सकें
1. कनेक्टर repo बनाकर, kafka-to-pubsub कनेक्टर जार पाएं
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True
सफल होने पर, आपको target/pubsub-group-kafka-connector-${VERSION}.jar पर जार फ़ाइल दिखेगी.
जार के पूरे पाथ के साथ एक वैरिएबल बनाएं.
export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"
2. Kafka Connect कॉन्फ़िगरेशन की मदद से, इंस्टॉल किए गए Kafka कॉन्फ़िगरेशन अपडेट करना
डायरेक्ट्री को पहले से डाउनलोड किए गए Kafka फ़ोल्डर में बदलें
cd kafka_2.13-3.5.1
Kafka के डाउनलोड फ़ोल्डर में /config/connect-standalone.properties खोलें. इसके बाद, डाउनलोड किए गए कनेक्टर जार का फ़ाइलपाथ, plugin.path में जोड़ें. अगर ज़रूरी हो, तो लाइन से टिप्पणी हटाएं. इसके अलावा, नीचे दिए गए cmd को भी चलाया जा सकता है
echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties
3. माइग्रेशन के लिए ज़रूरी काफ़्का विषय, Pub/Sub प्रोजेक्ट, और Pub/Sub विषय के साथ CloudPubSubSinkConnector कॉन्फ़िगरेशन फ़ाइल बनाएं. सैंपल CloudPubSubSinkConnector कॉन्फ़िगरेशन फ़ाइल यहां देखें.
cat <<EOF > config/cps-sink-connector.properties
name=CPSSinkConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=$TOPIC
cps.project=$PROJECT_ID
cps.topic=$TOPIC
EOF
4. Kafka विषय से Pub/Sub पर मैसेज फ़ॉरवर्ड करने के लिए, कनेक्टर शुरू करें
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties
GCP कंसोल पर पुष्टि करें कि मैसेज आपके Pub/Sub विषय पर फ़ॉरवर्ड किए जा रहे हैं
दूसरा चरण
Pub/Sub विषय से मैसेज पाने के लिए, उपभोक्ता ऐप्लिकेशन अपडेट करें. इस दौरान, आपका प्रोड्यूसर Kafka पर मैसेज पब्लिश करना जारी रखेगा
1. कंज्यूमर को अपडेट करें, ताकि वह Pub/Sub की सदस्यता ले सके. यह कंज्यूमर, कंसोल पर मैसेज प्रिंट करता है. सैंपल kafka-to-pubsub-demo में, src SimplePubsubscriber1 को Pubsub विषय से पढ़ने के लिए अपडेट किया गया है.
Pub/Sub की सदस्यता बनाना
export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
सदस्यता लेने वाले लोगों के लिए अपडेट किया गया ऐप्लिकेशन चलाना
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"
आउटपुट लॉग, इस तरह दिखने चाहिए
...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...
2. BigQuery में डेटा लिखने वाले उपभोक्ता को Pub/Sub की सदस्यता लेने के लिए अपडेट करें. सैंपल kafka-to-pubsub-demo में, src SimplePubsubscriber1 को Pubsub विषय से पढ़ने के लिए अपडेट किया गया है.
Pub/Sub की सदस्यता बनाना
export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
सदस्यता लेने वाले लोगों के लिए अपडेट किया गया ऐप्लिकेशन चलाना
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"
आउटपुट लॉग, इस तरह दिखने चाहिए
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
तीसरा चरण
Pub/Sub पर सीधे तौर पर पब्लिश करने के लिए, अपने प्रोड्यूसर अपडेट करें
- Kafka प्रोड्यूसर के src को अपडेट करें, ताकि Kafka के बजाय Pub/Sub में लिखा जा सके. सैंपल
kafka-to-pubsub-demosrc में, Pubsub विषय पर मैसेज भेजने के लिएSimplePubsubPublisherको अपडेट किया जाता है. - कनेक्टर को बंद करें. कनेक्टर को रोकने के लिए, kafka-connect टर्मिनल सेशन में चल रहे कनेक्टर को बंद करें
- पब्लिशर के अपडेट किए गए ऐप्लिकेशन को चलाएं
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
-Dexec.args="$PROJECT_ID $TOPIC"
6. बधाई हो
बधाई हो, आपने सेल्फ़-मैनेज किए गए Kafka ऐप्लिकेशन को Pub/Sub पर माइग्रेट करने से जुड़ा कोडलैब पूरा कर लिया है.
ज़्यादा जानकारी के लिए यहां कुछ लिंक दिए गए हैं