1. Giriş
Bu codelab'de, Aşamalı Taşıma yaklaşımı kullanılarak Apache Kafka'dan Google Cloud Pubsub'a uygulama taşınmasını gösteren adım adım açıklamalı bir kılavuz bulunmaktadır.
Kafka ile Pubsub ile Aşamalı Taşıma yaklaşımı arasındaki farklılıklar hakkında daha fazla bilgiye buradan ulaşabilirsiniz.
Neler oluşturacaksınız?
Bu demoda şunları yapacaksınız:
- GCE'de kendi kendine yönetilen Kafka kümesi oluşturma
- Rastgele dize akışı sağlayan basit bir Kafka uygulaması dağıtma
- Pub/Sub'ı kurun
- Pub/Sub Kafka Connector'ı kullanarak Kafka'dan Pubsub'a geçiş
Neler öğreneceksiniz?
- GCE'de kendi kendine yönetilen kafka kümesi oluşturma
- Kafka uygulamasını Pub/Sub uygulamasına taşıma
Gerekenler
- Google Cloud Platform'a erişim (BigQuery ve Pub/Sub için yazma izinleriyle).
- gcloud CLI yüklendi.
- Java 8+ yüklendi.
Maliyet
Bu belgede aşağıdaki faturalandırılabilir ürünleri/hizmetleri kullanacaksınız:
Öngörülen kullanımınıza göre bir maliyet tahmini oluşturmak için fiyat hesaplayıcıyı kullanın.
2. Kafka'yı kurma
Bu codelab'de ZooKeeper'ı kullanarak Kafka'yı başlatacağız. Yerel ortamınızda Java 8+ yüklü olmalıdır.
1. Kafka'yı yükleme
Kafka'yı indirip çıkarın. Uygulanması için ikili indirme işlemini önerin:
curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1
2. Hayvanat bahçesi bakıcısına başla
bin/zookeeper-server-start.sh config/zookeeper.properties
3. Komisyoncuyu başlat
Kafka acente hizmetini başlatmak için başka bir terminal oturumu açın ve şu komutu çalıştırın:
bin/kafka-server-start.sh config/server.properties
4. Kafka konusu oluştur
Kafka uygulaması için Kafka konusu oluşturun, yeni bir terminal oturumu açın ve şu komutu çalıştırın:
export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092
5. Konu oluşturmayı onayla
bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092
Yukarıdaki cmd'nin çıktısı aşağıdakine benzer olmalıdır:
Topic: my-topic TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
3. Kafka Uygulaması Oluşturma
Bu codelab'de 1 üretici ve 2 tüketicisi olan bir Java kafka uygulaması oluşturacağız. Üretici düzenli aralıklarla kafka konusuna rastgele dizeler ve bir zaman damgası gönderir.
Aşamalı taşıma işlemini göstermek amacıyla bu uygulama için 2 tüketici oluşturacağız.
- Tüketici 1 - Okunan iletileri yazdırır
- 2. Tüketici - Mesajları BigQuery'ye yazar
Yeni bir terminal açın ve aşağıdaki komutları çalıştırın. Bu komutları Kafka indirme dizininde çalıştırmayın
1. Sabit değerler belirleme
export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"
2. Kafka uygulamasını indirin src
git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka-to-pubsub-demo
3. gcloud'u yapılandırma ve kimliğini doğrulama
gcloud config set project $PROJECT_ID
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com
4. BigQuery tablosu oluşturma
Bu tablo, ikinci tüketici tarafından çıkışı yazmak için kullanılır. Tablonun şema tanımı şu şekildedir: "message:STRING, timestamp:STRING".
bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID
bq mk --table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING
5. Konuya mesaj göndermeye başlamak için üreticiyi çalıştırın
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
-Dexec.args="$TOPIC"
Çıkış günlükleri şuna benzer olmalıdır:
...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...
6. Konudaki mesajların oturumunu konsolda kapatan ilk tüketiciyi çalıştırın
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
-Dexec.args="$TOPIC"
Çıkış günlükleri aşağıdakine benzer olmalıdır:
...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...
7. Kafka konusundan bir BigQuery tablosuna mesaj yazan ikinci tüketiciyi çalıştırma
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
-Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"
Çıkış günlükleri aşağıdakine benzer olmalıdır:
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
8. Mesajların GCP Console'da BigQuery'ye başarıyla yazıldığını doğrulayın

4. Pubsub'u kurun
1. Pubsub'u etkinleştir
gcloud services enable pubsub.googleapis.com
2. Pubsub Konusu Oluştur
Bu konu zaman içinde kafka konusunun yerini alacaktır. Basit olması için kafka konusuyla aynı adı kullanabiliriz
export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC
5. Aşamalı Taşıma
Artık Kafka uygulamamızı yapılandırdığımıza ve taşıma için bir Pub/Sub konumuzu koyduğumuza göre, Kafka'dan Pub/Sub'a geçiş işlemine devam edeceğiz.
Bu taşıma demosunda, kafka altyapınızı aşamalı olarak taşımanızı sağlayan Google Cloud Pub/Sub Grubu'nun Pub/Sub Kafka Connector'ını kullanacağız.
1. Aşama
Pub/Sub bağlayıcısını Kafka konusundan tüm mesajları Pub/Sub konusuna yönlendirecek şekilde yapılandırma
1. Bağlayıcı deposunu oluşturarak kafka-pubsub bağlayıcı kavanozunu edinin
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True
Oluşturulan jar, target/pubsub-group-kafka-connector-${VERSION}.jar adresinde başarıyla görünecektir.
Jarza giden tam yolu kullanarak bir değişken oluşturun.
export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"
2. Yüklü Kafka yapılandırmalarınızı Kafka Connect yapılandırmalarıyla güncelleme
Dizini, önceki kafka indirme klasörünüzle değiştirin
cd kafka_2.13-3.5.1
Kafka indirme klasöründe /config/connect-standalone.properties dosyasını açın, indirilen bağlayıcı jar'ın dosya yolunu eklenti.path dosyasına ekleyin ve gerekirse satırın açıklamasını kaldırın. Alternatif olarak aşağıdaki cmd komutunu çalıştırabilirsiniz
echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties
3. Taşıma için gereken kafka konusu, pubsub projesi ve pubsub konusunu içeren bir CloudPubSubSinkConnector yapılandırma dosyası oluşturun. Örnek CloudPubSubSinkConnector yapılandırma dosyasını burada bulabilirsiniz.
cat <<EOF > config/cps-sink-connector.properties
name=CPSSinkConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=$TOPIC
cps.project=$PROJECT_ID
cps.topic=$TOPIC
EOF
4. Kafka konusundan Pub/Sub'a mesaj yönlendirmeye başlamak için bağlayıcıyı başlatın
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties
GCP Console'da mesajların Pub/Sub konunuza yönlendirildiğini onaylayın
2. Aşama
Üreticiniz Kafka'da mesaj yayınlamaya devam ederken tüketici uygulamalarını Pub/Sub konusundan mesaj alacak şekilde güncelleme
1. Mesajları konsola yazdıran tüketiciyi güncelleyerek Pub/Sub'a abone olmasını sağlayın. Örnekte kafka-to-pubsub-demo src, SimplePubsubscriber1 Pubsub konusundan okunacak şekilde güncellenmiştir.
Pub/Sub aboneliği oluşturma
export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
Güncellenen abone uygulamasını çalıştırma
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"
Çıkış günlükleri şuna benzer olmalıdır:
...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...
2. BigQuery'ye yazan tüketiciyi Pub/Sub'a abone olmak için güncelleyin. Örnekte kafka-to-pubsub-demo src, SimplePubsubscriber1 Pubsub konusundan okunacak şekilde güncellenmiştir.
Pub/Sub aboneliği oluşturma
export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
Güncellenen abone uygulamasını çalıştırma
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"
Çıkış günlükleri şuna benzer olmalıdır:
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
3. Aşama
Üreticilerinizi doğrudan Pub/Sub'da yayınlayacak şekilde güncelleme
- Kafka yerine Pub/Sub'a yazmak için Kafka üretici src'yi güncelleyin. Örnek
kafka-to-pubsub-demosrc'deSimplePubsubPublisher, Pubsub konusuna mesaj göndermek için güncellenmiştir. - Bağlayıcıyı durdurun. kafka-connect terminal oturumunda çalışan bağlayıcıyı sonlandırarak bağlayıcıyı durdurabilirsiniz.
- Güncellenen yayıncı uygulamasını çalıştırma
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
-Dexec.args="$PROJECT_ID $TOPIC"
6. Tebrikler
Tebrikler, kendi kendini yöneten Kafka uygulamalarını Pub/Sub'a taşıma konusunda codelab'i başarıyla tamamladınız.
Daha fazla bilgi için bazı bağlantılar