Apache Kafka से Pubsub में माइग्रेट करना

Apache Kafka से Pubsub में माइग्रेट करना

इस कोडलैब (कोड बनाना सीखने के लिए ट्यूटोरियल) के बारे में जानकारी

subjectपिछली बार अक्टू॰ 4, 2023 को अपडेट किया गया
account_circleTimothy Itodo & Sri Harshini Donthineni ने लिखा

1. परिचय

यह कोडलैब, चरण वाले माइग्रेशन तरीके का इस्तेमाल करके, Apache Kafka से Google Cloud Pubsub में ऐप्लिकेशन के माइग्रेशन को दिखाने के लिए सिलसिलेवार निर्देशों की प्रोसेस है.

Kafka और Pubsub के बीच के अंतर और सिलसिलेवार तरीके से माइग्रेट करने की प्रोसेस के बारे में ज़्यादा जानने के लिए, यहां जाएं.

आपको क्या बनाना होगा

इस डेमो में, आपको:

  • GCE पर खुद मैनेज किया जाने वाला Kafka क्लस्टर सेट अप करना
  • रैंडम स्ट्रिंग स्ट्रीम करने वाला Kafka ऐप्लिकेशन डिप्लॉय करें
  • Pub/Sub सेटअप करें
  • Pub/Sub Kafka कनेक्टर का इस्तेमाल करके Kafka से Pubsub पर माइग्रेट करना

आप इन चीज़ों के बारे में जानेंगे

  • GCE (जीसीई) पर सेल्फ़-मैनेज किए जाने वाले काफ़्का क्लस्टर सेट अप करने का तरीका
  • Kafka ऐप्लिकेशन को Pub/Sub ऐप्लिकेशन पर माइग्रेट करने का तरीका

आपको इनकी ज़रूरत होगी

  • Google Cloud Platform ऐक्सेस करें (BigQuery और Pub/Sub के लिए लिखने की अनुमतियों के साथ).
  • gcloud सीएलआई इंस्टॉल किया गया.
  • Java 8+ इंस्टॉल किया गया.

लागत

इस दस्तावेज़ में, बिल करने लायक इन प्रॉडक्ट/सेवाओं का इस्तेमाल किया जाएगा:

अपने अनुमानित इस्तेमाल के हिसाब से लागत का अनुमान जनरेट करने के लिए, प्राइसिंग कैलकुलेटर का इस्तेमाल करें.

2. Kafka को सेटअप करें

इस कोडलैब में, हम ZooKeeper का इस्तेमाल करके Kafka की शुरुआत करेंगे. आपके लोकल एनवायरमेंट में Java 8+ इंस्टॉल होना ज़रूरी है.

1. Kafka इंस्टॉल करें

Kafka डाउनलोड करें और इसे निकालें. इन्हें फ़ॉलो करने के लिए बाइनरी डाउनलोड करने का सुझाव दें:

curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1

2. ज़ूकीपर शुरू करें

bin/zookeeper-server-start.sh config/zookeeper.properties

3. ब्रोकर शुरू करें

Kafka ब्रोकर की सेवा शुरू करने के लिए, एक और टर्मिनल सेशन खोलें और चलाएं:

bin/kafka-server-start.sh config/server.properties

4. काफ़्का टॉपिक बनाएं

Kafka ऐप्लिकेशन के लिए Kafka विषय बनाएं, नया टर्मिनल सेशन खोलें और चलाएं:

export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092

5. विषय बनाने की पुष्टि करें

bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092

ऊपर दिए गए cmd का आउटपुट नीचे दिए गए तरीके जैसा दिखना चाहिए:

Topic: my-topic   TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1   ReplicationFactor: 1    Configs:
  Topic: my-topic Partition: 0    Leader: 0   Replicas: 0 Isr: 0

3. Kafka ऐप्लिकेशन बनाएं

इस कोडलैब में, हम एक java kafka ऐप्लिकेशन बनाएंगे जिसमें एक निर्माता और दो उपभोक्ता होंगे. निर्माता समय-समय पर काफ़्का के विषय के लिए रैंडम स्ट्रिंग और टाइमस्टैंप भेजता है.

अलग-अलग चरणों में माइग्रेट करने के बारे में जानकारी देने के लिए, हम इस ऐप्लिकेशन के लिए दो उपभोक्ता बनाएंगे.

  • उपभोक्ता 1 - पढ़े गए मैसेज का प्रिंट निकालता है
  • उपभोक्ता 2 - BigQuery में मैसेज लिखता है

नया टर्मिनल खोलें और इन निर्देशों का पालन करें. इन कमांड को Kafka की डाउनलोड डायरेक्ट्री में इस्तेमाल न करें

1. कॉन्स्टेंट वैरिएबल सेट करें

export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"

2. Kafka ऐप्लिकेशन का सोर्स डाउनलोड करें

git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka
-to-pubsub-demo

3. gcloud को कॉन्फ़िगर करना और उसकी पुष्टि करना

gcloud config set project $PROJECT_ID
gcloud auth application
-default login
gcloud services enable bigquery
.googleapis.com

4. BigQuery टेबल बनाना

दूसरा उपभोक्ता इस टेबल का इस्तेमाल आउटपुट लिखने के लिए करता है. टेबल की स्कीमा की परिभाषा "मैसेज:STRING, टाइमस्टैंप:STRING" है.

bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID 
bq mk
--table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING

5. विषय से जुड़े मैसेज भेजने के लिए प्रोड्यूसर चलाएं

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
  -Dexec.args="$TOPIC"

आउटपुट लॉग कुछ ऐसे दिखने चाहिए:

...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...

6. कंसोल पर, इस विषय से जुड़े मैसेज सबसे पहले लॉग आउट करने वाले उपभोक्ता को चलाएं

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
  -Dexec.args="$TOPIC"

आउटपुट लॉग कुछ ऐसे दिखने चाहिए:

...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...

7. दूसरे उपभोक्ता को चलाने के लिए, जो काफ़्का विषय से BigQuery टेबल पर मैसेज लिखता है

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
  -Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"

आउटपुट लॉग कुछ ऐसे दिखने चाहिए:

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

8. पुष्टि करें कि GCP कंसोल में, BigQuery में मैसेज लिखे जा रहे हैं

8734b356c59543af.png

4. Pubsub सेटअप करें

1. Pubsub चालू करें

gcloud services enable pubsub.googleapis.com

2. Pubsub विषय बनाएं

बाद में, यह विषय काफ़्का विषय की जगह ले लेगा. इसे आसानी से समझने के लिए, हम उसी नाम का इस्तेमाल कर सकते हैं जो काफ़्का टॉपिक के लिए है

export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC

5. अलग-अलग चरणों में माइग्रेशन

हमने Kafka ऐप्लिकेशन सेट अप कर लिया है और माइग्रेशन के लिए Pub/Sub विषय सेट अप कर लिया गया है. इसलिए, अब हम Kafka से Pub/Sub पर माइग्रेट करेंगे.

माइग्रेशन के इस डेमो में, हम Google Cloud Pub/Sub ग्रुप का Pub/Sub Kafka Connector इस्तेमाल करेंगे. इससे आपको अपने काफ़्का इन्फ़्रास्ट्रक्चर को अलग-अलग चरणों में माइग्रेट करने की सुविधा मिलेगी.

पहला चरण

Kafka विषय से Pub/Sub विषय पर सभी मैसेज फ़ॉरवर्ड करने के लिए, Pub/Sub कनेक्टर को कॉन्फ़िगर करें

1. कनेक्टर रेपो बनाकर, kafka-to-pubsub कनेक्टर जार को हासिल करें

git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True

सफल होने पर, आपको target/pubsub-group-kafka-connector-${VERSION}.jar पर जार दिखेगा.

जार के पूरे पाथ के साथ एक वैरिएबल बनाएं.

export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"

2. Kafka Connect के कॉन्फ़िगरेशन की मदद से, इंस्टॉल किए गए Kafka कॉन्फ़िगरेशन को अपडेट करना

डायरेक्ट्री को पहले से अपने kafka डाउनलोड फ़ोल्डर में बदलें

cd kafka_2.13-3.5.1

/config/connect-standalone.properties को Kafka डाउनलोड फ़ोल्डर में खोलें और डाउनलोड किए गए कनेक्टर जार के फ़ाइल पाथ को प्लगिन.पाथ में जोड़ें. इसके बाद, अगर ज़रूरी हो, तो लाइन में से टिप्पणी हटाएं. वैकल्पिक रूप से, आप नीचे दिए गए cmd को चला सकते हैं

echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties

3. CloudPubSubSinkConnector माइग्रेशन के लिए ज़रूरी kafka विषय, pubsub प्रोजेक्ट, और pubsub विषय के साथ एक कॉन्फ़िगरेशन फ़ाइल बनाएं. सैंपल देखने के लिए, CloudPubSubSinkConnector कॉन्फ़िगरेशन फ़ाइल यहां देखें.

cat <<EOF > config/cps-sink-connector.properties
name
=CPSSinkConnector
connector
.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks
.max=10
key
.converter=org.apache.kafka.connect.storage.StringConverter
value
.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics
=$TOPIC
cps
.project=$PROJECT_ID
cps
.topic=$TOPIC
EOF

4. Kafka विषय से Pub/Sub पर मैसेज फ़ॉरवर्ड करने के लिए, कनेक्टर शुरू करें

bin/connect-standalone.sh \
config
/connect-standalone.properties \
config
/cps-sink-connector.properties

GCP कंसोल पर पुष्टि करें कि आपके Pub/Sub विषय पर मैसेज फ़ॉरवर्ड किए जा रहे हैं

दूसरा चरण

Pub/Sub विषय से जुड़े मैसेज पाने के लिए, उपभोक्ता ऐप्लिकेशन अपडेट करें. इस दौरान, आपका निर्माता Kafka पर मैसेज पब्लिश करना जारी रखेगा

1. Pub/Sub की सदस्यता लेने के लिए, कंसोल में मैसेज को प्रिंट करने वाले उपभोक्ता को अपडेट करें. सैंपल में kafka-to-pubsub-demo src, SimplePubsubscriber1 को Pubsub विषय से पढ़ने के लिए अपडेट किया गया है.

Pub/Sub सदस्यता बनाना

export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC

अपडेट किया गया सदस्य ऐप्लिकेशन चलाएं

cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
  -Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"

आउटपुट लॉग इसके जैसे दिखने चाहिए

...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...

2. BigQuery पर लिखने वाले उपभोक्ता को Pub/Sub की सदस्यता लेने के लिए अपडेट करना. सैंपल में kafka-to-pubsub-demo src, SimplePubsubscriber1 को Pubsub विषय से पढ़ने के लिए अपडेट किया गया है.

Pub/Sub सदस्यता बनाना

export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC

अपडेट किया गया सदस्य ऐप्लिकेशन चलाएं

cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
  -Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"

आउटपुट लॉग इसके जैसे दिखने चाहिए

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

तीसरा चरण

सीधे तौर पर Pub/Sub पर कॉन्टेंट पब्लिश करने के लिए, अपने प्रोड्यूसर को अपडेट करें

  1. Kafka की जगह Pub/Sub लिखने के लिए, Kafka प्रोड्यूसर सोर्स को अपडेट करें. सैंपल kafka-to-pubsub-demo में, SimplePubsubPublisher को Pubsub विषय पर मैसेज भेजने के लिए अपडेट किया गया है.
  2. कनेक्टर बंद करें. काफ़्का-कनेक्ट टर्मिनल सेशन में, चल रहे कनेक्टर को खत्म करके, कनेक्टर को बंद किया जा सकता है
  3. अपडेट किया गया प्रकाशक ऐप्लिकेशन चलाएं
cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
  -Dexec.args="$PROJECT_ID $TOPIC"

6. बधाई हो

बधाई हो, आपने खुद से मैनेज किए जाने वाले Kafka ऐप्लिकेशन को Pub/Sub में माइग्रेट करने के लिए कोडलैब (कोड बनाना सीखना) पूरा कर लिया है.

ज़्यादा जानकारी के लिए यहां कुछ लिंक दिए गए हैं