Informazioni su questo codelab
1. Introduzione
Questo codelab è una guida passo passo per dimostrare la migrazione delle applicazioni da Apache Kafka a Google Cloud Pub/Sub utilizzando l'approccio di migrazione per fasi.
Puoi scoprire di più sulle differenze tra Kafka e Pub/Sub e l'approccio alla migrazione per fasi qui.
Cosa creerai
In questa demo imparerai a:
- Configura un cluster Kafka autogestito su GCE
- Esegui il deployment di una semplice applicazione Kafka che invia al flusso di stringhe casuali.
- Configura Pub/Sub
- Eseguire la migrazione da Kafka a Pub/Sub utilizzando il connettore Kafka di Pub/Sub
Obiettivi didattici
- Come configurare un cluster Kafka autogestito su GCE
- Eseguire la migrazione di un'applicazione Kafka a un'applicazione Pub/Sub
Che cosa ti serve
- Accedi alla piattaforma Google Cloud (con autorizzazioni di scrittura per BigQuery e Pub/Sub).
- gcloud CLI installato.
- Java 8 o versioni successive installato.
Costo
In questo documento, utilizzerai i seguenti prodotti/servizi fatturabili:
Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi.
2. Configura Kafka
In questo codelab, inizieremo a Kafka usando ZooKeeper. Nel tuo ambiente locale deve essere installato Java 8+.
1. Installa Kafka
Scarica Kafka ed estrailo. Consiglia il download binario da seguire:
curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1
2. Avvia zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
3. Avvia broker
Per avviare il servizio broker Kafka, apri un'altra sessione del terminale ed esegui:
bin/kafka-server-start.sh config/server.properties
4. Crea argomento Kafka
Crea un argomento Kafka per l'applicazione Kafka, apri una nuova sessione di terminale ed esegui:
export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092
5. Conferma creazione argomento
bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092
L'output del comando precedente dovrebbe essere simile al seguente:
Topic: my-topic TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
3. Crea un'applicazione Kafka
In questo codelab, creeremo un'applicazione Java Kafka che ha 1 producer e 2 consumer. Il produttore invia periodicamente stringhe casuali e un timestamp a un argomento kafka.
Per dimostrare la migrazione per fasi, creeremo due consumer per questa applicazione.
- Consumer 1 - Stampa i messaggi letti
- Consumatore 2 - Scrive i messaggi in BigQuery
Apri un nuovo terminale ed esegui questi comandi. Non eseguire questi comandi nella directory di download di Kafka
1. Imposta variabili costanti
export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"
2. Scarica l'applicazione Kafka src
git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka-to-pubsub-demo
3. Configura e autentica gcloud
gcloud config set project $PROJECT_ID
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com
4. Crea una tabella BigQuery
Questa tabella viene utilizzata dal secondo consumer per scrivere l'output. La definizione dello schema della tabella è "message:STRING, timestamp:STRING".
bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID
bq mk --table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING
5. Esegui il producer per iniziare a inviare messaggi all'argomento
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
-Dexec.args="$TOPIC"
I log di output dovrebbero avere un aspetto simile a questo:
...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...
6. Esegui il primo consumer che disconnette i messaggi dell'argomento nella console
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
-Dexec.args="$TOPIC"
I log di output dovrebbero avere un aspetto simile a questo:
...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...
7. Esegui il secondo consumer che scrive i messaggi dall'argomento Kafka in una tabella BigQuery
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
-Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"
I log di output dovrebbero avere un aspetto simile a questo:
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
8. Verifica che i messaggi vengano scritti correttamente in BigQuery nella console di Google Cloud
4. Configura Pub/Sub
1. Abilita Pub/Sub
gcloud services enable pubsub.googleapis.com
2. Crea argomento Pub/Sub
Questo argomento sostituirà l'argomento kafka. Per semplicità, possiamo utilizzare lo stesso nome dell'argomento kafka
export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC
5. Migrazione per fasi
Ora che abbiamo configurato l'applicazione Kafka e abbiamo configurato un argomento Pub/Sub per la migrazione, procederemo con la migrazione da Kafka a Pub/Sub.
In questa demo della migrazione utilizzeremo il connettore Pub/Sub Kafka del gruppo Google Cloud Pub/Sub, che consente di eseguire la migrazione dell'infrastruttura Kafka in più fasi.
Fase 1
Configura il connettore Pub/Sub per inoltrare tutti i messaggi dall'argomento Kafka all'argomento Pub/Sub
1. Acquisisci il barattolo del connettore kafka-to-pubsub creando il repository del connettore
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True
Se l'esito è positivo, dovresti vedere il jar risultante in target/pubsub-group-kafka-connector-${VERSION}.jar
.
Crea una variabile con il percorso completo del jar.
export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"
2. Aggiorna le configurazioni Kafka installate con le configurazioni Kafka Connect
Passa alla cartella di download kafka precedente
cd kafka_2.13-3.5.1
Apri /config/connect-standalone.properties
nella cartella di download Kafka e aggiungi il percorso file del jar del connettore scaricato a plugin.path e rimuovi il commento dalla riga, se necessario. In alternativa, puoi eseguire il comando cmd
echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties
3. Crea un file di configurazione CloudPubSubSinkConnector
con l'argomento kafka, il progetto pubsub e l'argomento pubsub necessari per la migrazione. Guarda il file di configurazione CloudPubSubSinkConnector
di esempio qui.
cat <<EOF > config/cps-sink-connector.properties
name=CPSSinkConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=$TOPIC
cps.project=$PROJECT_ID
cps.topic=$TOPIC
EOF
4. Avvia il connettore per iniziare a inoltrare messaggi dall'argomento Kafka a Pub/Sub
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties
Verifica nella console di Google Cloud che i messaggi siano inoltrati all'argomento Pub/Sub
Fase 2
Aggiorna le applicazioni consumer per ricevere messaggi dall'argomento Pub/Sub, mentre il tuo producer continua a pubblicare messaggi in Kafka
1. Aggiorna il consumer che stampa i messaggi nella console per eseguire la sottoscrizione a Pub/Sub. Nell'esempio di kafka-to-pubsub-demo
src, SimplePubsubscriber1
viene aggiornato in modo che venga letto dall'argomento Pub/Sub.
crea una sottoscrizione Pub/Sub
export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
Esegui l'applicazione aggiornata dell'abbonato
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"
I log di output dovrebbero essere simili a
...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...
2. Aggiornare il consumer che scrive in BigQuery per sottoscrivere la sottoscrizione a Pub/Sub. Nell'esempio di kafka-to-pubsub-demo
src, SimplePubsubscriber1
viene aggiornato in modo che venga letto dall'argomento Pub/Sub.
crea una sottoscrizione Pub/Sub
export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
Esegui l'applicazione aggiornata dell'abbonato
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"
I log di output dovrebbero essere simili a
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
Fase 3
Aggiorna i producer per pubblicare direttamente in Pub/Sub
- Aggiorna il producer src per scrivere in Pub/Sub anziché in Kafka. Nell'esempio src
kafka-to-pubsub-demo
,SimplePubsubPublisher
viene aggiornato in modo da inviare messaggi all'argomento Pub/Sub. - Arresta il connettore. Puoi arrestare il connettore terminando il connettore in esecuzione nella sessione del terminale kafka-connect
- Esegui l'applicazione del publisher aggiornata
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
-Dexec.args="$PROJECT_ID $TOPIC"
6. Complimenti
Complimenti, hai completato il codelab sulla migrazione delle applicazioni Kafka autogestite a Pub/Sub.
Ecco alcuni link per saperne di più