Apache Kafka'dan Pubsub'a taşıma

1. Giriş

Bu codelab'de, Aşamalı Taşıma yaklaşımı kullanılarak Apache Kafka'dan Google Cloud Pubsub'a uygulama taşınmasını gösteren adım adım açıklamalı bir kılavuz bulunmaktadır.

Kafka ile Pubsub ile Aşamalı Taşıma yaklaşımı arasındaki farklılıklar hakkında daha fazla bilgiye buradan ulaşabilirsiniz.

Neler oluşturacaksınız?

Bu demoda şunları yapacaksınız:

  • GCE'de kendi kendine yönetilen Kafka kümesi oluşturma
  • Rastgele dize akışı sağlayan basit bir Kafka uygulaması dağıtma
  • Pub/Sub'ı kurun
  • Pub/Sub Kafka Connector'ı kullanarak Kafka'dan Pubsub'a geçiş

Neler öğreneceksiniz?

  • GCE'de kendi kendine yönetilen kafka kümesi oluşturma
  • Kafka uygulamasını Pub/Sub uygulamasına taşıma

Gerekenler

  • Google Cloud Platform'a erişim (BigQuery ve Pub/Sub için yazma izinleriyle).
  • gcloud CLI yüklendi.
  • Java 8+ yüklendi.

Maliyet

Bu belgede aşağıdaki faturalandırılabilir ürünleri/hizmetleri kullanacaksınız:

Öngörülen kullanımınıza göre bir maliyet tahmini oluşturmak için fiyat hesaplayıcıyı kullanın.

2. Kafka'yı kurma

Bu codelab'de ZooKeeper'ı kullanarak Kafka'yı başlatacağız. Yerel ortamınızda Java 8+ yüklü olmalıdır.

1. Kafka'yı yükleme

Kafka'yı indirip çıkarın. Uygulanması için ikili indirme işlemini önerin:

curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1

2. Hayvanat bahçesi bakıcısına başla

bin/zookeeper-server-start.sh config/zookeeper.properties

3. Komisyoncuyu başlat

Kafka acente hizmetini başlatmak için başka bir terminal oturumu açın ve şu komutu çalıştırın:

bin/kafka-server-start.sh config/server.properties

4. Kafka konusu oluştur

Kafka uygulaması için Kafka konusu oluşturun, yeni bir terminal oturumu açın ve şu komutu çalıştırın:

export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092

5. Konu oluşturmayı onayla

bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092

Yukarıdaki cmd'nin çıktısı aşağıdakine benzer olmalıdır:

Topic: my-topic   TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1   ReplicationFactor: 1    Configs:
  Topic: my-topic Partition: 0    Leader: 0   Replicas: 0 Isr: 0

3. Kafka Uygulaması Oluşturma

Bu codelab'de 1 üretici ve 2 tüketicisi olan bir Java kafka uygulaması oluşturacağız. Üretici düzenli aralıklarla kafka konusuna rastgele dizeler ve bir zaman damgası gönderir.

Aşamalı taşıma işlemini göstermek amacıyla bu uygulama için 2 tüketici oluşturacağız.

  • Tüketici 1 - Okunan iletileri yazdırır
  • 2. Tüketici - Mesajları BigQuery'ye yazar

Yeni bir terminal açın ve aşağıdaki komutları çalıştırın. Bu komutları Kafka indirme dizininde çalıştırmayın

1. Sabit değerler belirleme

export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"

2. Kafka uygulamasını indirin src

git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka-to-pubsub-demo

3. gcloud'u yapılandırma ve kimliğini doğrulama

gcloud config set project $PROJECT_ID
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com

4. BigQuery tablosu oluşturma

Bu tablo, ikinci tüketici tarafından çıkışı yazmak için kullanılır. Tablonun şema tanımı şu şekildedir: "message:STRING, timestamp:STRING".

bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID 
bq mk --table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING

5. Konuya mesaj göndermeye başlamak için üreticiyi çalıştırın

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
  -Dexec.args="$TOPIC"

Çıkış günlükleri şuna benzer olmalıdır:

...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...

6. Konudaki mesajların oturumunu konsolda kapatan ilk tüketiciyi çalıştırın

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
  -Dexec.args="$TOPIC"

Çıkış günlükleri aşağıdakine benzer olmalıdır:

...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...

7. Kafka konusundan bir BigQuery tablosuna mesaj yazan ikinci tüketiciyi çalıştırma

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
  -Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"

Çıkış günlükleri aşağıdakine benzer olmalıdır:

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

8. Mesajların GCP Console'da BigQuery'ye başarıyla yazıldığını doğrulayın

8734b356c59543af.png

4. Pubsub'u kurun

1. Pubsub'u etkinleştir

gcloud services enable pubsub.googleapis.com

2. Pubsub Konusu Oluştur

Bu konu zaman içinde kafka konusunun yerini alacaktır. Basit olması için kafka konusuyla aynı adı kullanabiliriz

export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC

5. Aşamalı Taşıma

Artık Kafka uygulamamızı yapılandırdığımıza ve taşıma için bir Pub/Sub konumuzu koyduğumuza göre, Kafka'dan Pub/Sub'a geçiş işlemine devam edeceğiz.

Bu taşıma demosunda, kafka altyapınızı aşamalı olarak taşımanızı sağlayan Google Cloud Pub/Sub Grubu'nun Pub/Sub Kafka Connector'ını kullanacağız.

1. Aşama

Pub/Sub bağlayıcısını Kafka konusundan tüm mesajları Pub/Sub konusuna yönlendirecek şekilde yapılandırma

1. Bağlayıcı deposunu oluşturarak kafka-pubsub bağlayıcı kavanozunu edinin

git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True

Oluşturulan jar, target/pubsub-group-kafka-connector-${VERSION}.jar adresinde başarıyla görünecektir.

Jarza giden tam yolu kullanarak bir değişken oluşturun.

export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"

2. Yüklü Kafka yapılandırmalarınızı Kafka Connect yapılandırmalarıyla güncelleme

Dizini, önceki kafka indirme klasörünüzle değiştirin

cd kafka_2.13-3.5.1

Kafka indirme klasöründe /config/connect-standalone.properties dosyasını açın, indirilen bağlayıcı jar'ın dosya yolunu eklenti.path dosyasına ekleyin ve gerekirse satırın açıklamasını kaldırın. Alternatif olarak aşağıdaki cmd komutunu çalıştırabilirsiniz

echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties

3. Taşıma için gereken kafka konusu, pubsub projesi ve pubsub konusunu içeren bir CloudPubSubSinkConnector yapılandırma dosyası oluşturun. Örnek CloudPubSubSinkConnector yapılandırma dosyasını burada bulabilirsiniz.

cat <<EOF > config/cps-sink-connector.properties
name=CPSSinkConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=$TOPIC
cps.project=$PROJECT_ID
cps.topic=$TOPIC
EOF

4. Kafka konusundan Pub/Sub'a mesaj yönlendirmeye başlamak için bağlayıcıyı başlatın

bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties

GCP Console'da mesajların Pub/Sub konunuza yönlendirildiğini onaylayın

2. Aşama

Üreticiniz Kafka'da mesaj yayınlamaya devam ederken tüketici uygulamalarını Pub/Sub konusundan mesaj alacak şekilde güncelleme

1. Mesajları konsola yazdıran tüketiciyi güncelleyerek Pub/Sub'a abone olmasını sağlayın. Örnekte kafka-to-pubsub-demo src, SimplePubsubscriber1 Pubsub konusundan okunacak şekilde güncellenmiştir.

Pub/Sub aboneliği oluşturma

export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC

Güncellenen abone uygulamasını çalıştırma

cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
  -Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"

Çıkış günlükleri şuna benzer olmalıdır:

...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...

2. BigQuery'ye yazan tüketiciyi Pub/Sub'a abone olmak için güncelleyin. Örnekte kafka-to-pubsub-demo src, SimplePubsubscriber1 Pubsub konusundan okunacak şekilde güncellenmiştir.

Pub/Sub aboneliği oluşturma

export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC

Güncellenen abone uygulamasını çalıştırma

cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
  -Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"

Çıkış günlükleri şuna benzer olmalıdır:

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

3. Aşama

Üreticilerinizi doğrudan Pub/Sub'da yayınlayacak şekilde güncelleme

  1. Kafka yerine Pub/Sub'a yazmak için Kafka üretici src'yi güncelleyin. Örnek kafka-to-pubsub-demo src'de SimplePubsubPublisher, Pubsub konusuna mesaj göndermek için güncellenmiştir.
  2. Bağlayıcıyı durdurun. kafka-connect terminal oturumunda çalışan bağlayıcıyı sonlandırarak bağlayıcıyı durdurabilirsiniz.
  3. Güncellenen yayıncı uygulamasını çalıştırma
cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
  -Dexec.args="$PROJECT_ID $TOPIC"

6. Tebrikler

Tebrikler, kendi kendini yöneten Kafka uygulamalarını Pub/Sub'a taşıma konusunda codelab'i başarıyla tamamladınız.

Daha fazla bilgi için bazı bağlantılar