1. Einführung
In diesem Codelab wird Schritt für Schritt gezeigt, wie Sie Anwendungen mit dem Ansatz Phased Migration (Phasenweise Migration) von Apache Kafka zu Google Cloud Pub/Sub migrieren.
Aufgaben
In dieser Demo werden Sie:
- Selbstverwalteten Kafka-Cluster in GCE einrichten
- Einfache Kafka-Anwendung bereitstellen, die zufällige Strings streamt
- Pub/Sub einrichten
- Mit dem Pub/Sub-Kafka-Connector von Kafka zu Pub/Sub migrieren
Lerninhalte
- Selbstverwalteten Kafka-Cluster in GCE einrichten
- Kafka-Anwendung zu einer Pub/Sub-Anwendung migrieren
Voraussetzungen
- Zugriff auf die Google Cloud Platform (mit Schreibberechtigungen für BigQuery und Pub/Sub).
- Die gcloud CLI muss installiert sein.
- Java 8 oder höher ist installiert.
Kosten
In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Produkte/Dienste:
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
2. Kafka einrichten
In diesem Codelab starten wir Kafka mit ZooKeeper. In Ihrer lokalen Umgebung muss Java 8 oder höher installiert sein.
1. Kafka installieren
Laden Sie Kafka herunter und extrahieren Sie die Dateien. Empfehlen Sie den Binärdownload, um die Schritte nachzuvollziehen:
curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1
2. Zookeeper starten
bin/zookeeper-server-start.sh config/zookeeper.properties
3. Broker starten
Öffnen Sie eine weitere Terminalsitzung und führen Sie Folgendes aus, um den Kafka-Broker-Dienst zu starten:
bin/kafka-server-start.sh config/server.properties
4. Kafka-Thema erstellen
Erstellen Sie ein Kafka-Thema für die Kafka-Anwendung, öffnen Sie eine neue Terminalsitzung und führen Sie Folgendes aus:
export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092
5. Erstellen des Themas bestätigen
bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092
Die Ausgabe des oben genannten Befehls sollte in etwa so aussehen:
Topic: my-topic TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
3. Kafka-Anwendung erstellen
In diesem Codelab erstellen wir eine Java-Kafka-Anwendung mit einem Producer und zwei Consumern. Der Producer sendet regelmäßig zufällige Strings und einen Zeitstempel an ein Kafka-Thema.
Um die phasenweise Migration zu demonstrieren, erstellen wir zwei Nutzer für diese Anwendung.
- Consumer 1 – Gibt die gelesenen Nachrichten aus
- Consumer 2: Schreibt die Nachrichten in BigQuery
Öffnen Sie ein neues Terminal und führen Sie die folgenden Befehle aus. Führen Sie diese Befehle nicht im Kafka-Downloadverzeichnis aus.
1. Konstante Variablen festlegen
export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"
2. Kafka-Anwendung herunterladen
git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka-to-pubsub-demo
3. gcloud konfigurieren und authentifizieren
gcloud config set project $PROJECT_ID
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com
4. BigQuery-Tabelle erstellen
Diese Tabelle wird vom zweiten Consumer verwendet, um die Ausgabe zu schreiben. Die Schemadefinition der Tabelle lautet „message:STRING, timestamp:STRING“.
bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID
bq mk --table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING
5. Führen Sie den Producer aus, um mit dem Senden von Nachrichten an das Thema zu beginnen.
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
-Dexec.args="$TOPIC"
Die Ausgabelogs sollten in etwa so aussehen:
...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...
6. Führen Sie den ersten Consumer aus, der Nachrichten im Thema in der Konsole protokolliert.
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
-Dexec.args="$TOPIC"
Die Ausgabelogs sollten in etwa so aussehen:
...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...
7. Zweiten Consumer ausführen, der Nachrichten aus dem Kafka-Thema in eine BigQuery-Tabelle schreibt
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
-Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"
Die Ausgabelogs sollten in etwa so aussehen:
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
8. Prüfen, ob Nachrichten in der GCP Console erfolgreich in BigQuery geschrieben werden

4. Pubsub einrichten
1. PubSub aktivieren
gcloud services enable pubsub.googleapis.com
2. Pub/Sub-Thema erstellen
Dieses Thema wird das Kafka-Thema schließlich ersetzen. Der Einfachheit halber können wir denselben Namen wie für das Kafka-Thema verwenden.
export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC
5. Stufenweise Migration
Nachdem wir unsere Kafka-Anwendung eingerichtet und ein Pub/Sub-Thema für die Migration erstellt haben, fahren wir mit der Migration von Kafka zu Pub/Sub fort.
In dieser Migrationsdemo verwenden wir den Pub/Sub Kafka-Connector der Google Cloud Pub/Sub-Gruppe, mit dem Sie Ihre Kafka-Infrastruktur phasenweise migrieren können.
Phase 1
Den Pub/Sub-Connector so konfigurieren, dass alle Nachrichten aus dem Kafka-Thema an das Pub/Sub-Thema weitergeleitet werden
1. Rufen Sie die JAR-Datei des Connectors „kafka-to-pubsub“ ab, indem Sie das Connector-Repository erstellen.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True
Bei Erfolg sollte die resultierende JAR-Datei unter target/pubsub-group-kafka-connector-${VERSION}.jar angezeigt werden.
Erstellen Sie eine Variable mit dem vollständigen Pfad zur JAR-Datei.
export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"
2. Installierte Kafka-Konfigurationen mit Kafka Connect-Konfigurationen aktualisieren
Wechseln Sie in das Kafka-Downloadverzeichnis von zuvor.
cd kafka_2.13-3.5.1
Öffnen Sie /config/connect-standalone.properties im Kafka-Downloadordner und fügen Sie den Dateipfad der heruntergeladenen Connector-JAR-Datei zu „plugin.path“ hinzu. Entfernen Sie bei Bedarf die Kommentarzeichen aus der Zeile. Alternativ können Sie den folgenden Befehl ausführen:
echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties
3. Erstellen Sie eine CloudPubSubSinkConnector-Konfigurationsdatei mit dem Kafka-Thema, dem Pub/Sub-Projekt und dem Pub/Sub-Thema, die für die Migration erforderlich sind. CloudPubSubSinkConnector Beispielkonfigurationsdateifinden Sie hier.
cat <<EOF > config/cps-sink-connector.properties
name=CPSSinkConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=$TOPIC
cps.project=$PROJECT_ID
cps.topic=$TOPIC
EOF
4. Connector starten, um Nachrichten vom Kafka-Thema an Pub/Sub weiterzuleiten
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties
Prüfen Sie in der GCP Console, ob Nachrichten an Ihr Pub/Sub-Thema weitergeleitet werden.
Phase 2
Consumer-Anwendungen aktualisieren, damit Nachrichten von einem Pub/Sub-Thema empfangen werden, während Ihr Producer weiterhin Nachrichten in Kafka veröffentlicht
1. Aktualisieren Sie den Consumer, der Nachrichten in der Konsole ausgibt, um Pub/Sub zu abonnieren. Im Beispiel kafka-to-pubsub-demo srcSimplePubsubscriber1 wird aktualisiert, um Daten aus dem Pub/Sub-Thema zu lesen.
Pub/Sub-Abo erstellen
export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
Aktualisierte Subscriber-Anwendung ausführen
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"
Die Ausgabelogs sollten in etwa so aussehen:
...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...
2. Aktualisieren Sie den Consumer, der in BigQuery schreibt, um Pub/Sub zu abonnieren. Im Beispiel kafka-to-pubsub-demo srcSimplePubsubscriber1 wird aktualisiert, um Daten aus dem Pub/Sub-Thema zu lesen.
Pub/Sub-Abo erstellen
export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
Aktualisierte Subscriber-Anwendung ausführen
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"
Die Ausgabelogs sollten in etwa so aussehen:
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
Phase 3
Producer aktualisieren, damit sie direkt in Pub/Sub veröffentlichen
- Aktualisieren Sie die Kafka-Producer-Quelle, damit Daten in Pub/Sub anstelle von Kafka geschrieben werden. Im Beispiel-
kafka-to-pubsub-demo-Quellcode wirdSimplePubsubPublisheraktualisiert, um Nachrichten an das Pub/Sub-Thema zu senden. - Beenden Sie den Connector. Sie können den Connector beenden, indem Sie den laufenden Connector in der kafka-connect-Terminalsitzung beenden.
- Aktualisierte Publisher-Anwendung ausführen
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
-Dexec.args="$PROJECT_ID $TOPIC"
6. Glückwunsch
Sie haben das Codelab zur Migration von selbstverwalteten Kafka-Anwendungen zu Pub/Sub erfolgreich abgeschlossen.
Hier finden Sie einige Links mit weiteren Informationen: