Migração do Apache Kafka para o Pubsub

1. Introdução

Este codelab é um guia explicativo para demonstrar a migração de aplicativos do Apache Kafka para o Google Cloud Pubsub usando a abordagem de migração em fases.

Saiba mais sobre as diferenças entre o Kafka e o Pubsub e a abordagem de migração em fases aqui.

O que você vai criar

Nesta demonstração, você vai:

  • Configurar um cluster Kafka autogerenciado no GCE
  • Implante um aplicativo Kafka simples que faz streaming de strings aleatórias
  • Configurar o Pub/Sub
  • Migrar do Kafka para o Pubsub usando o conector Kafka do Pub/Sub

O que você vai aprender

  • Como configurar um cluster Kafka autogerenciado no GCE
  • Como migrar um aplicativo Kafka para um aplicativo do Pub/Sub

O que é necessário

  • Acessar o Google Cloud Platform (com permissões de gravação para BigQuery e Pub/Sub).
  • CLI gcloud instalada
  • Java 8 ou superior instalado.

Custo

Neste documento, você usará os seguintes produtos/serviços faturáveis:

Para gerar uma estimativa de custo com base no uso previsto, use a calculadora de preços.

2. Configurar o Kafka

Neste codelab, iniciaremos o Kafka usando o ZooKeeper. Seu ambiente local precisa ter o Java 8+ instalado.

1. Instale o Kafka

Faça o download do Kafka e extraia-o. Recomende o download do binário para acompanhar:

curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1

2. Iniciar zoológico

bin/zookeeper-server-start.sh config/zookeeper.properties

3. Iniciar agente

Para iniciar o serviço do agente do Kafka, abra outra sessão do terminal e execute:

bin/kafka-server-start.sh config/server.properties

4. Criar tópico kafka

Crie um tópico Kafka para o aplicativo Kafka, abra uma nova sessão do terminal e execute:

export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092

5. Confirmar criação do tópico

bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092

A saída do cmd acima será semelhante a esta:

Topic: my-topic   TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1   ReplicationFactor: 1    Configs:
  Topic: my-topic Partition: 0    Leader: 0   Replicas: 0 Isr: 0

3. Criar um aplicativo Kafka

Neste codelab, criaremos um aplicativo java kafka que tem um produtor e dois consumidores. O produtor envia periodicamente strings aleatórias e um carimbo de data/hora para um tópico kafka.

Para demonstrar a migração em fases, criaremos dois consumidores para o aplicativo.

  • Consumidor 1: imprime as mensagens lidas
  • Consumidor 2: grava as mensagens no BigQuery

Abra um novo terminal e execute os comandos a seguir. Não execute estes comandos no diretório de download do Kafka

1. Definir variáveis constantes

export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"

2. Faça o download do src do aplicativo Kafka

git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka-to-pubsub-demo

3. Configure e autentique a gcloud

gcloud config set project $PROJECT_ID
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com

4. Criar uma tabela do BigQuery

Essa tabela é usada pelo segundo consumidor para gravar a saída. A definição do esquema da tabela é "message:STRING, timestamp:STRING".

bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID 
bq mk --table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING

5. Execute o produtor para começar a enviar mensagens ao tópico

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
  -Dexec.args="$TOPIC"

Os registros de saída precisam ser semelhantes a:

...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...

6. Executar o primeiro consumidor que desconecta as mensagens do tópico no console

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
  -Dexec.args="$TOPIC"

Os registros de saída precisam ser semelhantes a:

...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...

7. Execute o segundo consumidor que grava mensagens do tópico "kafka" em uma tabela do BigQuery

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
  -Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"

Os registros de saída precisam ser semelhantes a:

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

8. Confirme se as mensagens estão sendo gravadas no BigQuery no Console do GCP

8734b356c59543af.png

4. Configurar o Pubsub

1. Ativar o Pub/Sub

gcloud services enable pubsub.googleapis.com

2. Criar tópico do Pub/Sub

Este tópico vai substituir o tópico kafka. Para simplificar, podemos usar o mesmo nome do tópico "kafka"

export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC

5. Migração por etapas

Agora que configuramos nosso aplicativo Kafka e temos um tópico do Pub/Sub para migrar, vamos continuar com a migração do Kafka para o Pub/Sub.

Nesta demonstração de migração, vamos usar o conector de Kafka do Pub/Sub do Google Cloud Pub/Sub, que permite migrar a infraestrutura kafka em fases.

Fase 1

Configure o conector do Pub/Sub para encaminhar todas as mensagens do tópico Kafka para o tópico do Pub/Sub

1. Adquira o jar do conector kafka-to-pubsub criando o repositório do conector

git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True

Você verá o jar resultante em target/pubsub-group-kafka-connector-${VERSION}.jar.

Crie uma variável com o caminho completo para o jar.

export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"

2. Atualizar as configurações do Kafka instaladas com as configurações do Kafka Connect

Mude o diretório para a pasta de downloads do Kafka anterior.

cd kafka_2.13-3.5.1

Abra /config/connect-standalone.properties na pasta de download do Kafka e adicione o caminho de arquivo do jar do conector baixado em plugin.path e remova a marca de comentário da linha, se necessário. Como alternativa, você pode executar o cmd abaixo

echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties

3. Crie um arquivo de configuração CloudPubSubSinkConnector com o tópico kafka, o projeto pubsub e o tópico pubsub necessários para a migração. Confira um exemplo de CloudPubSubSinkConnector arquivo de configuração aqui.

cat <<EOF > config/cps-sink-connector.properties
name=CPSSinkConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=$TOPIC
cps.project=$PROJECT_ID
cps.topic=$TOPIC
EOF

4. Inicie o conector para começar a encaminhar mensagens do tópico Kafka para o Pub/Sub

bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties

No Console do GCP, confirme se as mensagens estão sendo encaminhadas para seu tópico do Pub/Sub

Fase 2

Atualize os aplicativos do consumidor para receber mensagens do tópico do Pub/Sub enquanto seu produtor continua publicando mensagens no Kafka

1. Atualizar o consumidor que imprime as mensagens no console para assinar o Pub/Sub. Na amostra kafka-to-pubsub-demo src, SimplePubsubscriber1 é atualizado para ler o tópico do Pubsub.

Criar uma assinatura no Pub/Sub

export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC

Executar aplicativo de assinante atualizado

cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
  -Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"

Os registros de saída precisam ser semelhantes a

...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...

2. Atualizar o consumidor que grava no BigQuery para assinar o Pub/Sub. No exemplo de kafka-to-pubsub-demo src, SimplePubsubscriber1 é atualizado para ler o tópico Pubsub.

Criar uma assinatura no Pub/Sub

export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC

Executar aplicativo de assinante atualizado

cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
  -Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"

Os registros de saída precisam ser semelhantes a

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

Fase 3

Atualize seus produtores para publicar diretamente no Pub/Sub

  1. Atualize o src do produtor do Kafka para gravar no Pub/Sub em vez do Kafka. No src kafka-to-pubsub-demo de amostra, SimplePubsubPublisher é atualizado para enviar mensagens ao tópico do Pub/Sub.
  2. Interrompa o conector. Para parar o conector, encerre o conector em execução na sessão do terminal kafka-connect
  3. Executar o aplicativo atualizado do editor
cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
  -Dexec.args="$PROJECT_ID $TOPIC"

6. Parabéns

Parabéns, você concluiu o codelab sobre como migrar aplicativos Kafka autogerenciados para o Pub/Sub.

Aqui estão alguns links para mais informações