1. Wprowadzenie
To ćwiczenie w Codelabs zawiera szczegółowy przewodnik, który przedstawia migrację aplikacji z Apache Kafka do Google Cloud Pubsub przy użyciu metody migracji stopniowej.
Więcej informacji na temat różnic między Kafka i Pubsub oraz migracji stopniowej znajdziesz tutaj.
Co utworzysz
W tej prezentacji:
- Konfigurowanie samodzielnie zarządzanego klastra Kafka w GCE
- Wdróż prostą aplikację Kafka, która przesyła strumieniowo losowe ciągi znaków
- Skonfiguruj Pub/Sub
- Przeprowadź migrację z Kafka do Pubsub za pomocą oprogramowania sprzęgającego Pub/Sub Kafka
Czego się nauczysz
- Jak skonfigurować samodzielnie zarządzany klaster Kafka w GCE
- Jak przenieść aplikację Kafka do aplikacji Pub/Sub
Czego potrzebujesz
- Dostęp do Google Cloud Platform (z uprawnieniami do zapisu w BigQuery i Pub/Sub).
- Interfejs wiersza poleceń gcloud został zainstalowany.
- Zainstalowano język Java w wersji 8 lub nowszej.
Koszt
W tym dokumencie będziesz korzystać z następujących płatnych produktów/usług:
Aby wygenerować oszacowanie kosztów na podstawie przewidywanego wykorzystania, użyj kalkulatora cen.
2. Skonfiguruj Kafka
W ramach tego ćwiczenia z programowania zaczniemy używać Kafki z funkcją ZooKeeper. W środowisku lokalnym musi być zainstalowana aplikacja Java w wersji 8 lub nowszej.
1. Zainstaluj Kafka
Pobierz i rozpakuj Kafka. Poleć pobranie plików binarnych, wykonując te czynności:
curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1
2. Zacznij zoo
bin/zookeeper-server-start.sh config/zookeeper.properties
3. Uruchom brokera
Aby uruchomić usługę brokera Kafka, otwórz inną sesję terminala i uruchom polecenie:
bin/kafka-server-start.sh config/server.properties
4. Utwórz temat Kafka
Utwórz temat Kafka dla aplikacji Kafka, otwórz nową sesję terminala i uruchom:
export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092
5. Potwierdzanie utworzenia tematu
bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092
Dane wyjściowe tego polecenia cmd powinny wyglądać podobnie do tego:
Topic: my-topic TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
3. Utwórz aplikację Kafka
W ramach tego ćwiczenia w Codelabs utworzymy aplikację w Javie kafka, która ma 1 producenta i 2 konsumentów. Producent okresowo wysyła do tematu kafka losowe ciągi znaków i sygnaturę czasową.
Aby zademonstrować migrację stopniową, utworzymy dla tej aplikacji 2 klientów.
- Klient 1 – drukuje przeczytane wiadomości
- Klient 2 – zapisuje wiadomości w BigQuery
Otwórz nowy terminal i uruchom następujące polecenia. Nie uruchamiaj tych poleceń w katalogu pobierania Kafka
1. Ustawianie zmiennych stałych
export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"
2. Pobierz aplikację Kafka src
git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka-to-pubsub-demo
3. Konfigurowanie i uwierzytelnianie gcloud
gcloud config set project $PROJECT_ID
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com
4. Tworzenie tabeli BigQuery
Drugi konsument używa jej do zapisu danych wyjściowych. Definicja schematu tabeli to „message:STRING, timestamp:STRING”.
bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID
bq mk --table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING
5. Uruchom producenta, aby zacząć wysyłać wiadomości do tematu
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
-Dexec.args="$TOPIC"
Logi wyjściowe powinny wyglądać mniej więcej tak:
...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...
6. Uruchomienie pierwszego konsumenta, który wyloguje w konsoli wiadomości w temacie
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
-Dexec.args="$TOPIC"
Logi wyjściowe powinny wyglądać mniej więcej tak:
...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...
7. Uruchomienie drugiego konsumenta, który zapisuje wiadomości z tematu kafka, do tabeli BigQuery
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
-Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"
Logi wyjściowe powinny wyglądać mniej więcej tak:
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
8. Sprawdź, czy wiadomości są zapisywane w BigQuery w konsoli GCP
4. Skonfiguruj PubSub
1. Włącz PubSub
gcloud services enable pubsub.googleapis.com
2. Utwórz temat Pub/Sub
Ten temat zastąpi temat kafka. Dla uproszczenia możemy użyć tej samej nazwy co w temacie kafka.
export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC
5. Migracja etapowa
Po skonfigurowaniu aplikacji Kafka i ustawieniu tematu Pub/Sub do migracji rozpoczniemy migrację z usługi Kafka do Pub/Sub.
W tej demonstracji migracji będziemy używać oprogramowania sprzęgającego Kafka Pub/Sub dostępnego w grupie Google Cloud Pub/Sub, które umożliwia stopniową migrację infrastruktury Kafka.
Faza 1
Skonfiguruj oprogramowanie sprzęgające Pub/Sub do przekazywania wszystkich wiadomości z tematu Kafka do tematu Pub/Sub
1. Zdobądź plik Jar oprogramowania sprzęgającego kafka-to-pubsub, tworząc repozytorium oprogramowania sprzęgającego
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True
Po pomyślnym zakończeniu operacji powinien wyświetlić się plik jar o nazwie target/pubsub-group-kafka-connector-${VERSION}.jar
.
Utwórz zmienną z pełną ścieżką do pliku jar.
export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"
2. Aktualizowanie zainstalowanych konfiguracji Kafka za pomocą konfiguracji Kafka Connect
Zmień katalog na folder pobierania Kafka z wcześniejszego folderu
cd kafka_2.13-3.5.1
Otwórz plik /config/connect-standalone.properties
w folderze pobierania Kafka i dodaj ścieżkę pliku pobranego pliku jar oprogramowania sprzęgającego do Plugin.path i w razie potrzeby usuń znacznik komentarza z wiersza. Możesz też uruchomić poniższe polecenie cmd
echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties
3. Utwórz CloudPubSubSinkConnector
plik konfiguracyjny z tematem kafka, projektem Pubsub i tematem PubSub wymaganym do migracji. Przykładowy CloudPubSubSinkConnector
plik konfiguracyjny znajdziesz tutaj.
cat <<EOF > config/cps-sink-connector.properties
name=CPSSinkConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=$TOPIC
cps.project=$PROJECT_ID
cps.topic=$TOPIC
EOF
4. Uruchom oprogramowanie sprzęgające, aby rozpocząć przekazywanie wiadomości z tematu Kafka do Pub/Sub
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties
Sprawdź w konsoli GCP, czy wiadomości są przekazywane do tematu Pub/Sub
Faza 2
Zaktualizuj aplikacje dla konsumentów, aby otrzymywać wiadomości z tematu Pub/Sub, podczas gdy producent nadal będzie publikować wiadomości w Kafka
1. Zaktualizuj konsumenta wyświetlającego wiadomości w konsoli, aby zasubskrybować Pub/Sub. W przykładzie kafka-to-pubsub-demo
src SimplePubsubscriber1
jest odczytywany z tematu Pubsub.
Tworzenie subskrypcji Pub/Sub
export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
Uruchom zaktualizowaną aplikację subskrybującą
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"
Logi wyjściowe powinny wyglądać podobnie do
...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...
2. Zaktualizuj konsumenta, który zapisuje w BigQuery, aby zasubskrybować Pub/Sub. W przykładzie kafka-to-pubsub-demo
src SimplePubsubscriber1
jest odczytywany z tematu Pubsub.
Tworzenie subskrypcji Pub/Sub
export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
Uruchom zaktualizowaną aplikację subskrybującą
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"
Logi wyjściowe powinny wyglądać podobnie do
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
Faza 3
Zaktualizuj producentów, aby publikować bezpośrednio w Pub/Sub
- Zaktualizuj atrybut src producenta Kafka, aby zapis w Pub/Sub zamiast w Kafka. W przykładowym
kafka-to-pubsub-demo
src tagSimplePubsubPublisher
został zaktualizowany tak, aby wysyłał wiadomości do tematu PubSub. - Zatrzymaj oprogramowanie sprzęgające. Możesz zatrzymać oprogramowanie sprzęgające, zamykając uruchomione oprogramowanie sprzęgające w sesji terminala kafka-connect
- Uruchom zaktualizowaną aplikację wydawcy
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
-Dexec.args="$PROJECT_ID $TOPIC"
6. Gratulacje
Gratulujemy ukończenia szkolenia z programowania dotyczącego migracji samodzielnie zarządzanych aplikacji Kafka do Pub/Sub.
Oto kilka linków do dodatkowych informacji