Migracja z Apache Kafka do Pubsub

1. Wprowadzenie

To ćwiczenie w Codelabs zawiera szczegółowy przewodnik, który przedstawia migrację aplikacji z Apache Kafka do Google Cloud Pubsub przy użyciu metody migracji stopniowej.

Więcej informacji na temat różnic między Kafka i Pubsub oraz migracji stopniowej znajdziesz tutaj.

Co utworzysz

W tej prezentacji:

  • Konfigurowanie samodzielnie zarządzanego klastra Kafka w GCE
  • Wdróż prostą aplikację Kafka, która przesyła strumieniowo losowe ciągi znaków
  • Skonfiguruj Pub/Sub
  • Przeprowadź migrację z Kafka do Pubsub za pomocą oprogramowania sprzęgającego Pub/Sub Kafka

Czego się nauczysz

  • Jak skonfigurować samodzielnie zarządzany klaster Kafka w GCE
  • Jak przenieść aplikację Kafka do aplikacji Pub/Sub

Czego potrzebujesz

  • Dostęp do Google Cloud Platform (z uprawnieniami do zapisu w BigQuery i Pub/Sub).
  • Interfejs wiersza poleceń gcloud został zainstalowany.
  • Zainstalowano język Java w wersji 8 lub nowszej.

Koszt

W tym dokumencie będziesz korzystać z następujących płatnych produktów/usług:

Aby wygenerować oszacowanie kosztów na podstawie przewidywanego wykorzystania, użyj kalkulatora cen.

2. Skonfiguruj Kafka

W ramach tego ćwiczenia z programowania zaczniemy używać Kafki z funkcją ZooKeeper. W środowisku lokalnym musi być zainstalowana aplikacja Java w wersji 8 lub nowszej.

1. Zainstaluj Kafka

Pobierz i rozpakuj Kafka. Poleć pobranie plików binarnych, wykonując te czynności:

curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1

2. Zacznij zoo

bin/zookeeper-server-start.sh config/zookeeper.properties

3. Uruchom brokera

Aby uruchomić usługę brokera Kafka, otwórz inną sesję terminala i uruchom polecenie:

bin/kafka-server-start.sh config/server.properties

4. Utwórz temat Kafka

Utwórz temat Kafka dla aplikacji Kafka, otwórz nową sesję terminala i uruchom:

export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092

5. Potwierdzanie utworzenia tematu

bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092

Dane wyjściowe tego polecenia cmd powinny wyglądać podobnie do tego:

Topic: my-topic   TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1   ReplicationFactor: 1    Configs:
  Topic: my-topic Partition: 0    Leader: 0   Replicas: 0 Isr: 0

3. Utwórz aplikację Kafka

W ramach tego ćwiczenia w Codelabs utworzymy aplikację w Javie kafka, która ma 1 producenta i 2 konsumentów. Producent okresowo wysyła do tematu kafka losowe ciągi znaków i sygnaturę czasową.

Aby zademonstrować migrację stopniową, utworzymy dla tej aplikacji 2 klientów.

  • Klient 1 – drukuje przeczytane wiadomości
  • Klient 2 – zapisuje wiadomości w BigQuery

Otwórz nowy terminal i uruchom następujące polecenia. Nie uruchamiaj tych poleceń w katalogu pobierania Kafka

1. Ustawianie zmiennych stałych

export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"

2. Pobierz aplikację Kafka src

git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka-to-pubsub-demo

3. Konfigurowanie i uwierzytelnianie gcloud

gcloud config set project $PROJECT_ID
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com

4. Tworzenie tabeli BigQuery

Drugi konsument używa jej do zapisu danych wyjściowych. Definicja schematu tabeli to „message:STRING, timestamp:STRING”.

bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID 
bq mk --table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING

5. Uruchom producenta, aby zacząć wysyłać wiadomości do tematu

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
  -Dexec.args="$TOPIC"

Logi wyjściowe powinny wyglądać mniej więcej tak:

...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...

6. Uruchomienie pierwszego konsumenta, który wyloguje w konsoli wiadomości w temacie

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
  -Dexec.args="$TOPIC"

Logi wyjściowe powinny wyglądać mniej więcej tak:

...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...

7. Uruchomienie drugiego konsumenta, który zapisuje wiadomości z tematu kafka, do tabeli BigQuery

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
  -Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"

Logi wyjściowe powinny wyglądać mniej więcej tak:

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

8. Sprawdź, czy wiadomości są zapisywane w BigQuery w konsoli GCP

8734b356c59543af.png

4. Skonfiguruj PubSub

1. Włącz PubSub

gcloud services enable pubsub.googleapis.com

2. Utwórz temat Pub/Sub

Ten temat zastąpi temat kafka. Dla uproszczenia możemy użyć tej samej nazwy co w temacie kafka.

export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC

5. Migracja etapowa

Po skonfigurowaniu aplikacji Kafka i ustawieniu tematu Pub/Sub do migracji rozpoczniemy migrację z usługi Kafka do Pub/Sub.

W tej demonstracji migracji będziemy używać oprogramowania sprzęgającego Kafka Pub/Sub dostępnego w grupie Google Cloud Pub/Sub, które umożliwia stopniową migrację infrastruktury Kafka.

Faza 1

Skonfiguruj oprogramowanie sprzęgające Pub/Sub do przekazywania wszystkich wiadomości z tematu Kafka do tematu Pub/Sub

1. Zdobądź plik Jar oprogramowania sprzęgającego kafka-to-pubsub, tworząc repozytorium oprogramowania sprzęgającego

git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True

Po pomyślnym zakończeniu operacji powinien wyświetlić się plik jar o nazwie target/pubsub-group-kafka-connector-${VERSION}.jar.

Utwórz zmienną z pełną ścieżką do pliku jar.

export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"

2. Aktualizowanie zainstalowanych konfiguracji Kafka za pomocą konfiguracji Kafka Connect

Zmień katalog na folder pobierania Kafka z wcześniejszego folderu

cd kafka_2.13-3.5.1

Otwórz plik /config/connect-standalone.properties w folderze pobierania Kafka i dodaj ścieżkę pliku pobranego pliku jar oprogramowania sprzęgającego do Plugin.path i w razie potrzeby usuń znacznik komentarza z wiersza. Możesz też uruchomić poniższe polecenie cmd

echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties

3. Utwórz CloudPubSubSinkConnector plik konfiguracyjny z tematem kafka, projektem Pubsub i tematem PubSub wymaganym do migracji. Przykładowy CloudPubSubSinkConnector plik konfiguracyjny znajdziesz tutaj.

cat <<EOF > config/cps-sink-connector.properties
name=CPSSinkConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=$TOPIC
cps.project=$PROJECT_ID
cps.topic=$TOPIC
EOF

4. Uruchom oprogramowanie sprzęgające, aby rozpocząć przekazywanie wiadomości z tematu Kafka do Pub/Sub

bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties

Sprawdź w konsoli GCP, czy wiadomości są przekazywane do tematu Pub/Sub

Faza 2

Zaktualizuj aplikacje dla konsumentów, aby otrzymywać wiadomości z tematu Pub/Sub, podczas gdy producent nadal będzie publikować wiadomości w Kafka

1. Zaktualizuj konsumenta wyświetlającego wiadomości w konsoli, aby zasubskrybować Pub/Sub. W przykładzie kafka-to-pubsub-demo src SimplePubsubscriber1 jest odczytywany z tematu Pubsub.

Tworzenie subskrypcji Pub/Sub

export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC

Uruchom zaktualizowaną aplikację subskrybującą

cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
  -Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"

Logi wyjściowe powinny wyglądać podobnie do

...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...

2. Zaktualizuj konsumenta, który zapisuje w BigQuery, aby zasubskrybować Pub/Sub. W przykładzie kafka-to-pubsub-demo src SimplePubsubscriber1 jest odczytywany z tematu Pubsub.

Tworzenie subskrypcji Pub/Sub

export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC

Uruchom zaktualizowaną aplikację subskrybującą

cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
  -Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"

Logi wyjściowe powinny wyglądać podobnie do

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

Faza 3

Zaktualizuj producentów, aby publikować bezpośrednio w Pub/Sub

  1. Zaktualizuj atrybut src producenta Kafka, aby zapis w Pub/Sub zamiast w Kafka. W przykładowym kafka-to-pubsub-demo src tag SimplePubsubPublisher został zaktualizowany tak, aby wysyłał wiadomości do tematu PubSub.
  2. Zatrzymaj oprogramowanie sprzęgające. Możesz zatrzymać oprogramowanie sprzęgające, zamykając uruchomione oprogramowanie sprzęgające w sesji terminala kafka-connect
  3. Uruchom zaktualizowaną aplikację wydawcy
cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
  -Dexec.args="$PROJECT_ID $TOPIC"

6. Gratulacje

Gratulujemy ukończenia szkolenia z programowania dotyczącego migracji samodzielnie zarządzanych aplikacji Kafka do Pub/Sub.

Oto kilka linków do dodatkowych informacji