1. Einführung
In diesem Codelab wird die Migration von Anwendungen von Apache Kafka zu Google Cloud Pubsub mithilfe einer phasenweisen Migration Schritt für Schritt beschrieben.
Weitere Informationen zu den Unterschieden zwischen Kafka und Pubsub sowie dem Ansatz der gestaffelten Migration finden Sie hier.
Aufgaben
In dieser Demo werden Sie:
- Selbstverwalteten Kafka-Cluster in GCE einrichten
- Einfache Kafka-Anwendung bereitstellen, die zufällige Strings streamt
- Pub/Sub einrichten
- Mit dem Pub/Sub-Kafka-Connector von Kafka zu Pub/Sub migrieren
Lerninhalte
- Selbstverwaltete Kafka-Cluster in GCE einrichten
- Kafka-Anwendung zu einer Pub/Sub-Anwendung migrieren
Voraussetzungen
- Auf die Google Cloud Platform zugreifen (mit Schreibberechtigungen für BigQuery und Pub/Sub).
- Die gcloud CLI ist installiert.
- Java 8+ installiert.
Kosten
In diesem Dokument werden Sie die folgenden kostenpflichtigen Produkte/Dienste verwenden:
Sie können mithilfe des Preisrechners eine Kostenschätzung für Ihre voraussichtliche Nutzung erstellen.
2. Kafka einrichten
In diesem Codelab starten wir Kafka mit ZooKeeper. In Ihrer lokalen Umgebung muss Java 8+ installiert sein.
1. Kafka installieren
Laden Sie Kafka herunter und extrahieren Sie es. Empfehlen Sie den Binärdownload, um Ihnen die Arbeit zu erleichtern:
curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1
2. Zookeeper starten
bin/zookeeper-server-start.sh config/zookeeper.properties
3. Broker starten
Öffnen Sie eine weitere Terminalsitzung und führen Sie den folgenden Befehl aus, um den Kafka-Broker-Dienst zu starten:
bin/kafka-server-start.sh config/server.properties
4. Kafka-Thema erstellen
Erstellen Sie ein Kafka-Thema für die Kafka-Anwendung, öffnen Sie eine neue Terminalsitzung und führen Sie folgenden Befehl aus:
export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092
5. Erstellen des Themas bestätigen
bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092
Die Ausgabe des obigen cmd-Befehls sollte in etwa so aussehen:
Topic: my-topic TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
3. Kafka-Anwendung erstellen
In diesem Codelab erstellen wir eine Java-Kafka-Anwendung mit 1 Producer und 2 Consumern. Der Producer sendet regelmäßig zufällige Strings und einen Zeitstempel an ein Kafka-Thema.
Zur Demonstration der phasenweisen Migration erstellen wir zwei Nutzer für diese Anwendung.
- Nutzer 1 – druckt die gelesenen Nachrichten aus
- Nutzer 2 – schreibt die Nachrichten in BigQuery
Öffnen Sie ein neues Terminal und führen Sie die folgenden Befehle aus. Führen Sie diese Befehle nicht im Kafka-Downloadverzeichnis aus.
1. Konstantenvariablen festlegen
export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"
2. Quelle der Kafka-Anwendung herunterladen
git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka-to-pubsub-demo
3. gcloud konfigurieren und authentifizieren
gcloud config set project $PROJECT_ID
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com
4. BigQuery-Tabelle erstellen
Diese Tabelle wird vom zweiten Nutzer verwendet, um die Ausgabe zu schreiben. Die Schemadefinition der Tabelle lautet "message:STRING, timestamp:STRING".
bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID
bq mk --table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING
5. Führen Sie den Producer aus, um mit dem Senden von Nachrichten an das Thema zu beginnen
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
-Dexec.args="$TOPIC"
Ausgabelogs sollten in etwa so aussehen:
...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...
6. Ersten Nutzer ausführen, der Nachrichten im Thema in der Console abmeldet
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
-Dexec.args="$TOPIC"
Ausgabelogs sollten in etwa so aussehen:
...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...
7. Zweiten Nutzer ausführen, der Nachrichten aus dem Kafka-Thema in eine BigQuery-Tabelle schreibt
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
-Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"
Ausgabelogs sollten in etwa so aussehen:
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
8. Prüfen, ob Nachrichten in der GCP Console erfolgreich in BigQuery geschrieben werden

4. Pub/Sub einrichten
1. Pub/Sub aktivieren
gcloud services enable pubsub.googleapis.com
2. Pub/Sub-Thema erstellen
Dieses Thema wird schließlich das Kafka-Thema ersetzen. Der Einfachheit halber können wir denselben Namen wie für das kafka-Thema verwenden.
export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC
5. Migration in Phasen
Nachdem Sie die Kafka-Anwendung eingerichtet und ein Pub/Sub-Thema für die Migration eingerichtet haben, können Sie mit der Migration von Kafka zu Pub/Sub fortfahren.
In dieser Migrationsdemo verwenden wir den Pub/Sub-Kafka-Connector der Google Cloud Pub/Sub-Gruppe, mit dem Sie Ihre Kafka-Infrastruktur in Phasen migrieren können.
Phase 1
Pub/Sub-Connector so konfigurieren, dass alle Nachrichten vom Kafka-Thema an das Pub/Sub-Thema weitergeleitet werden
1. Besorgen Sie sich die JAR-Datei für den kafka-to-pubsub-Connector, indem Sie das Connector-Repository erstellen.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True
Die resultierende JAR-Datei sollte unter target/pubsub-group-kafka-connector-${VERSION}.jar angezeigt werden.
Erstellen Sie eine Variable mit dem vollständigen Pfad zur JAR-Datei.
export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"
2. Installierte Kafka-Konfigurationen mit Kafka Connect-Konfigurationen aktualisieren
Ändern Sie das Verzeichnis in den kafka-Downloadordner von früher
cd kafka_2.13-3.5.1
Öffnen Sie /config/connect-standalone.properties im Kafka-Downloadordner, fügen Sie den Dateipfad der heruntergeladenen JAR-Datei des Connectors zu „plugin.path“ hinzu und entfernen Sie gegebenenfalls die Kommentarzeichen in der Zeile. Alternativ können Sie den folgenden cmd-Befehl ausführen:
echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties
3. Erstellen Sie eine CloudPubSubSinkConnector -Konfigurationsdatei mit dem für die Migration erforderlichen Kafka-Thema, Pub/Sub-Projekt und Pub/Sub-Thema. Eine Beispielkonfigurationsdatei CloudPubSubSinkConnector finden Sie hier.
cat <<EOF > config/cps-sink-connector.properties
name=CPSSinkConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=$TOPIC
cps.project=$PROJECT_ID
cps.topic=$TOPIC
EOF
4. Starten Sie den Connector, um Nachrichten vom Kafka-Thema an Pub/Sub weiterzuleiten.
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties
Prüfen Sie in der GCP Console, ob Nachrichten an Ihr Pub/Sub-Thema weitergeleitet werden.
Phase 2
Nutzeranwendungen aktualisieren, um Nachrichten vom Pub/Sub-Thema zu empfangen, während der Producer weiterhin Nachrichten in Kafka veröffentlicht
1. Aktualisieren Sie den Nutzer, der Nachrichten zum Abonnieren von Pub/Sub an die Console ausgibt. Im Beispiel wird kafka-to-pubsub-demo src, SimplePubsubscriber1 zum Lesen aus dem Pub/Sub-Thema aktualisiert.
Pub/Sub-Abo erstellen
export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
Aktualisierte Abonnentenanwendung ausführen
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"
Ausgabelogs sollten in etwa so aussehen:
...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...
2. Aktualisieren Sie den Nutzer, der in BigQuery schreibt, um Pub/Sub zu abonnieren. Im Beispiel wird kafka-to-pubsub-demo src, SimplePubsubscriber1 zum Lesen aus dem Pub/Sub-Thema aktualisiert.
Pub/Sub-Abo erstellen
export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
Aktualisierte Abonnentenanwendung ausführen
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"
Ausgabelogs sollten in etwa so aussehen:
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
Phase 3
Produzenten für die direkte Veröffentlichung in Pub/Sub aktualisieren
- Aktualisieren Sie die Kafka-Producer-Quelle, um in Pub/Sub statt in Kafka zu schreiben. Im Beispiel
kafka-to-pubsub-demosrc wirdSimplePubsubPublisheraktualisiert, um Nachrichten an das Pub/Sub-Thema zu senden. - Halten Sie den Connector an. Sie können den Connector anhalten, indem Sie den laufenden Connector in der Kafka-connect-Terminalsitzung beenden
- Aktualisierte Publisher-Anwendung ausführen
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
-Dexec.args="$PROJECT_ID $TOPIC"
6. Glückwunsch
Herzlichen Glückwunsch! Sie haben das Codelab zur Migration selbstverwalteter Kafka-Anwendungen zu Pub/Sub erfolgreich abgeschlossen.
Unter den folgenden Links finden Sie weitere Informationen: