1. Introdução
Este codelab é um guia explicativo para demonstrar a migração de aplicativos do Apache Kafka para o Google Cloud Pubsub usando a abordagem de migração em fases.
Saiba mais sobre as diferenças entre o Kafka e o Pubsub e a abordagem de migração em fases aqui.
O que você vai criar
Nesta demonstração, você vai:
- Configurar um cluster Kafka autogerenciado no GCE
- Implante um aplicativo Kafka simples que faz streaming de strings aleatórias
- Configurar o Pub/Sub
- Migrar do Kafka para o Pubsub usando o conector Kafka do Pub/Sub
O que você vai aprender
- Como configurar um cluster Kafka autogerenciado no GCE
- Como migrar um aplicativo Kafka para um aplicativo do Pub/Sub
O que é necessário
- Acessar o Google Cloud Platform (com permissões de gravação para BigQuery e Pub/Sub).
- CLI gcloud instalada
- Java 8 ou superior instalado.
Custo
Neste documento, você usará os seguintes produtos/serviços faturáveis:
Para gerar uma estimativa de custo com base no uso previsto, use a calculadora de preços.
2. Configurar o Kafka
Neste codelab, iniciaremos o Kafka usando o ZooKeeper. Seu ambiente local precisa ter o Java 8+ instalado.
1. Instale o Kafka
Faça o download do Kafka e extraia-o. Recomende o download do binário para acompanhar:
curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1
2. Iniciar zoológico
bin/zookeeper-server-start.sh config/zookeeper.properties
3. Iniciar agente
Para iniciar o serviço do agente do Kafka, abra outra sessão do terminal e execute:
bin/kafka-server-start.sh config/server.properties
4. Criar tópico kafka
Crie um tópico Kafka para o aplicativo Kafka, abra uma nova sessão do terminal e execute:
export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092
5. Confirmar criação do tópico
bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092
A saída do cmd acima será semelhante a esta:
Topic: my-topic TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
3. Criar um aplicativo Kafka
Neste codelab, criaremos um aplicativo java kafka que tem um produtor e dois consumidores. O produtor envia periodicamente strings aleatórias e um carimbo de data/hora para um tópico kafka.
Para demonstrar a migração em fases, criaremos dois consumidores para o aplicativo.
- Consumidor 1: imprime as mensagens lidas
- Consumidor 2: grava as mensagens no BigQuery
Abra um novo terminal e execute os comandos a seguir. Não execute estes comandos no diretório de download do Kafka
1. Definir variáveis constantes
export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"
2. Faça o download do src do aplicativo Kafka
git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka-to-pubsub-demo
3. Configure e autentique a gcloud
gcloud config set project $PROJECT_ID
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com
4. Criar uma tabela do BigQuery
Essa tabela é usada pelo segundo consumidor para gravar a saída. A definição do esquema da tabela é "message:STRING, timestamp:STRING".
bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID
bq mk --table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING
5. Execute o produtor para começar a enviar mensagens ao tópico
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
-Dexec.args="$TOPIC"
Os registros de saída precisam ser semelhantes a:
...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...
6. Executar o primeiro consumidor que desconecta as mensagens do tópico no console
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
-Dexec.args="$TOPIC"
Os registros de saída precisam ser semelhantes a:
...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...
7. Execute o segundo consumidor que grava mensagens do tópico "kafka" em uma tabela do BigQuery
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
-Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"
Os registros de saída precisam ser semelhantes a:
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
8. Confirme se as mensagens estão sendo gravadas no BigQuery no Console do GCP

4. Configurar o Pubsub
1. Ativar o Pub/Sub
gcloud services enable pubsub.googleapis.com
2. Criar tópico do Pub/Sub
Este tópico vai substituir o tópico kafka. Para simplificar, podemos usar o mesmo nome do tópico "kafka"
export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC
5. Migração por etapas
Agora que configuramos nosso aplicativo Kafka e temos um tópico do Pub/Sub para migrar, vamos continuar com a migração do Kafka para o Pub/Sub.
Nesta demonstração de migração, vamos usar o conector de Kafka do Pub/Sub do Google Cloud Pub/Sub, que permite migrar a infraestrutura kafka em fases.
Fase 1
Configure o conector do Pub/Sub para encaminhar todas as mensagens do tópico Kafka para o tópico do Pub/Sub
1. Adquira o jar do conector kafka-to-pubsub criando o repositório do conector
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True
Você verá o jar resultante em target/pubsub-group-kafka-connector-${VERSION}.jar.
Crie uma variável com o caminho completo para o jar.
export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"
2. Atualizar as configurações do Kafka instaladas com as configurações do Kafka Connect
Mude o diretório para a pasta de downloads do Kafka anterior.
cd kafka_2.13-3.5.1
Abra /config/connect-standalone.properties na pasta de download do Kafka e adicione o caminho de arquivo do jar do conector baixado em plugin.path e remova a marca de comentário da linha, se necessário. Como alternativa, você pode executar o cmd abaixo
echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties
3. Crie um arquivo de configuração CloudPubSubSinkConnector com o tópico kafka, o projeto pubsub e o tópico pubsub necessários para a migração. Confira um exemplo de CloudPubSubSinkConnector arquivo de configuração aqui.
cat <<EOF > config/cps-sink-connector.properties
name=CPSSinkConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=$TOPIC
cps.project=$PROJECT_ID
cps.topic=$TOPIC
EOF
4. Inicie o conector para começar a encaminhar mensagens do tópico Kafka para o Pub/Sub
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties
No Console do GCP, confirme se as mensagens estão sendo encaminhadas para seu tópico do Pub/Sub
Fase 2
Atualize os aplicativos do consumidor para receber mensagens do tópico do Pub/Sub enquanto seu produtor continua publicando mensagens no Kafka
1. Atualizar o consumidor que imprime as mensagens no console para assinar o Pub/Sub. Na amostra kafka-to-pubsub-demo src, SimplePubsubscriber1 é atualizado para ler o tópico do Pubsub.
Criar uma assinatura no Pub/Sub
export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
Executar aplicativo de assinante atualizado
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"
Os registros de saída precisam ser semelhantes a
...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...
2. Atualizar o consumidor que grava no BigQuery para assinar o Pub/Sub. No exemplo de kafka-to-pubsub-demo src, SimplePubsubscriber1 é atualizado para ler o tópico Pubsub.
Criar uma assinatura no Pub/Sub
export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
Executar aplicativo de assinante atualizado
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"
Os registros de saída precisam ser semelhantes a
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
Fase 3
Atualize seus produtores para publicar diretamente no Pub/Sub
- Atualize o src do produtor do Kafka para gravar no Pub/Sub em vez do Kafka. No src
kafka-to-pubsub-demode amostra,SimplePubsubPublisheré atualizado para enviar mensagens ao tópico do Pub/Sub. - Interrompa o conector. Para parar o conector, encerre o conector em execução na sessão do terminal kafka-connect
- Executar o aplicativo atualizado do editor
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
-Dexec.args="$PROJECT_ID $TOPIC"
6. Parabéns
Parabéns, você concluiu o codelab sobre como migrar aplicativos Kafka autogerenciados para o Pub/Sub.
Aqui estão alguns links para mais informações