Migrazione da Apache Kafka a Pub/Sub

Migrazione da Apache Kafka a Pub/Sub

Informazioni su questo codelab

subjectUltimo aggiornamento: ott 4, 2023
account_circleScritto da: Timothy Itodo & Sri Harshini Donthineni

1. Introduzione

Questo codelab è una guida passo passo per dimostrare la migrazione delle applicazioni da Apache Kafka a Google Cloud Pub/Sub utilizzando l'approccio di migrazione per fasi.

Puoi scoprire di più sulle differenze tra Kafka e Pub/Sub e l'approccio alla migrazione per fasi qui.

Cosa creerai

In questa demo imparerai a:

  • Configura un cluster Kafka autogestito su GCE
  • Esegui il deployment di una semplice applicazione Kafka che invia al flusso di stringhe casuali.
  • Configura Pub/Sub
  • Eseguire la migrazione da Kafka a Pub/Sub utilizzando il connettore Kafka di Pub/Sub

Obiettivi didattici

  • Come configurare un cluster Kafka autogestito su GCE
  • Eseguire la migrazione di un'applicazione Kafka a un'applicazione Pub/Sub

Che cosa ti serve

  • Accedi alla piattaforma Google Cloud (con autorizzazioni di scrittura per BigQuery e Pub/Sub).
  • gcloud CLI installato.
  • Java 8 o versioni successive installato.

Costo

In questo documento, utilizzerai i seguenti prodotti/servizi fatturabili:

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi.

2. Configura Kafka

In questo codelab, inizieremo a Kafka usando ZooKeeper. Nel tuo ambiente locale deve essere installato Java 8+.

1. Installa Kafka

Scarica Kafka ed estrailo. Consiglia il download binario da seguire:

curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar
-xzf kafka_2.13-3.5.1.tgz
cd kafka_2
.13-3.5.1

2. Avvia zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

3. Avvia broker

Per avviare il servizio broker Kafka, apri un'altra sessione del terminale ed esegui:

bin/kafka-server-start.sh config/server.properties

4. Crea argomento Kafka

Crea un argomento Kafka per l'applicazione Kafka, apri una nuova sessione di terminale ed esegui:

export TOPIC= "my-topic"
bin
/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092

5. Conferma creazione argomento

bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092

L'output del comando precedente dovrebbe essere simile al seguente:

Topic: my-topic   TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1   ReplicationFactor: 1    Configs:
 
Topic: my-topic Partition: 0    Leader: 0   Replicas: 0 Isr: 0

3. Crea un'applicazione Kafka

In questo codelab, creeremo un'applicazione Java Kafka che ha 1 producer e 2 consumer. Il produttore invia periodicamente stringhe casuali e un timestamp a un argomento kafka.

Per dimostrare la migrazione per fasi, creeremo due consumer per questa applicazione.

  • Consumer 1 - Stampa i messaggi letti
  • Consumatore 2 - Scrive i messaggi in BigQuery

Apri un nuovo terminale ed esegui questi comandi. Non eseguire questi comandi nella directory di download di Kafka

1. Imposta variabili costanti

export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"

2. Scarica l'applicazione Kafka src

git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka
-to-pubsub-demo

3. Configura e autentica gcloud

gcloud config set project $PROJECT_ID
gcloud auth application
-default login
gcloud services enable bigquery
.googleapis.com

4. Crea una tabella BigQuery

Questa tabella viene utilizzata dal secondo consumer per scrivere l'output. La definizione dello schema della tabella è "message:STRING, timestamp:STRING".

bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID 
bq mk
--table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING

5. Esegui il producer per iniziare a inviare messaggi all'argomento

mvn clean install
mvn
exec:java \
 
-Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
 
-Dexec.args="$TOPIC"

I log di output dovrebbero avere un aspetto simile a questo:

...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...

6. Esegui il primo consumer che disconnette i messaggi dell'argomento nella console

mvn clean install
mvn
exec:java \
 
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
 
-Dexec.args="$TOPIC"

I log di output dovrebbero avere un aspetto simile a questo:

...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...

7. Esegui il secondo consumer che scrive i messaggi dall'argomento Kafka in una tabella BigQuery

mvn clean install
mvn
exec:java \
 
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
 
-Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"

I log di output dovrebbero avere un aspetto simile a questo:

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

8. Verifica che i messaggi vengano scritti correttamente in BigQuery nella console di Google Cloud

8734b356c59543af.png

4. Configura Pub/Sub

1. Abilita Pub/Sub

gcloud services enable pubsub.googleapis.com

2. Crea argomento Pub/Sub

Questo argomento sostituirà l'argomento kafka. Per semplicità, possiamo utilizzare lo stesso nome dell'argomento kafka

export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC

5. Migrazione per fasi

Ora che abbiamo configurato l'applicazione Kafka e abbiamo configurato un argomento Pub/Sub per la migrazione, procederemo con la migrazione da Kafka a Pub/Sub.

In questa demo della migrazione utilizzeremo il connettore Pub/Sub Kafka del gruppo Google Cloud Pub/Sub, che consente di eseguire la migrazione dell'infrastruttura Kafka in più fasi.

Fase 1

Configura il connettore Pub/Sub per inoltrare tutti i messaggi dall'argomento Kafka all'argomento Pub/Sub

1. Acquisisci il barattolo del connettore kafka-to-pubsub creando il repository del connettore

git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java
-pubsub-group-kafka-connector/
mvn clean
package -DskipTests=True

Se l'esito è positivo, dovresti vedere il jar risultante in target/pubsub-group-kafka-connector-${VERSION}.jar.

Crea una variabile con il percorso completo del jar.

export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"

2. Aggiorna le configurazioni Kafka installate con le configurazioni Kafka Connect

Passa alla cartella di download kafka precedente

cd kafka_2.13-3.5.1

Apri /config/connect-standalone.properties nella cartella di download Kafka e aggiungi il percorso file del jar del connettore scaricato a plugin.path e rimuovi il commento dalla riga, se necessario. In alternativa, puoi eseguire il comando cmd

echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties

3. Crea un file di configurazione CloudPubSubSinkConnector con l'argomento kafka, il progetto pubsub e l'argomento pubsub necessari per la migrazione. Guarda il file di configurazione CloudPubSubSinkConnector di esempio qui.

cat <<EOF > config/cps-sink-connector.properties
name
=CPSSinkConnector
connector
.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks
.max=10
key
.converter=org.apache.kafka.connect.storage.StringConverter
value
.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics
=$TOPIC
cps
.project=$PROJECT_ID
cps
.topic=$TOPIC
EOF

4. Avvia il connettore per iniziare a inoltrare messaggi dall'argomento Kafka a Pub/Sub

bin/connect-standalone.sh \
config
/connect-standalone.properties \
config
/cps-sink-connector.properties

Verifica nella console di Google Cloud che i messaggi siano inoltrati all'argomento Pub/Sub

Fase 2

Aggiorna le applicazioni consumer per ricevere messaggi dall'argomento Pub/Sub, mentre il tuo producer continua a pubblicare messaggi in Kafka

1. Aggiorna il consumer che stampa i messaggi nella console per eseguire la sottoscrizione a Pub/Sub. Nell'esempio di kafka-to-pubsub-demo src, SimplePubsubscriber1 viene aggiornato in modo che venga letto dall'argomento Pub/Sub.

crea una sottoscrizione Pub/Sub

export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID
--topic=$TOPIC

Esegui l'applicazione aggiornata dell'abbonato

cd kafka-to-pubsub-demo
mvn
exec:java \
 
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
 
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"

I log di output dovrebbero essere simili a

...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...

2. Aggiornare il consumer che scrive in BigQuery per sottoscrivere la sottoscrizione a Pub/Sub. Nell'esempio di kafka-to-pubsub-demo src, SimplePubsubscriber1 viene aggiornato in modo che venga letto dall'argomento Pub/Sub.

crea una sottoscrizione Pub/Sub

export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID
--topic=$TOPIC

Esegui l'applicazione aggiornata dell'abbonato

cd kafka-to-pubsub-demo
mvn
exec:java \
 
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
 
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"

I log di output dovrebbero essere simili a

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

Fase 3

Aggiorna i producer per pubblicare direttamente in Pub/Sub

  1. Aggiorna il producer src per scrivere in Pub/Sub anziché in Kafka. Nell'esempio src kafka-to-pubsub-demo, SimplePubsubPublisher viene aggiornato in modo da inviare messaggi all'argomento Pub/Sub.
  2. Arresta il connettore. Puoi arrestare il connettore terminando il connettore in esecuzione nella sessione del terminale kafka-connect
  3. Esegui l'applicazione del publisher aggiornata
cd kafka-to-pubsub-demo
mvn
exec:java \
 
-Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
 
-Dexec.args="$PROJECT_ID $TOPIC"

6. Complimenti

Complimenti, hai completato il codelab sulla migrazione delle applicazioni Kafka autogestite a Pub/Sub.

Ecco alcuni link per saperne di più