Apache Kafka에서 Pubsub로 마이그레이션

Apache Kafka에서 Pubsub로 마이그레이션

이 Codelab 정보

subject최종 업데이트: 10월 4, 2023
account_circle작성자: Timothy Itodo & Sri Harshini Donthineni

1. 소개

이 Codelab은 단계적 이전 방식을 사용하여 Apache Kafka에서 Google Cloud Pubsub로 애플리케이션을 이전하는 방법을 보여주는 단계별 안내입니다.

Kafka와 Pubsub의 차이점과 단계적 마이그레이션 접근 방식의 차이점은 여기에서 확인할 수 있습니다.

빌드할 항목

이 데모에서 수행할 작업은 다음과 같습니다.

  • GCE에서 자체 관리형 Kafka 클러스터 설정
  • 임의의 문자열을 스트리밍하는 간단한 Kafka 애플리케이션 배포
  • Pub/Sub 설정
  • Pub/Sub Kafka 커넥터를 사용하여 Kafka에서 Pubsub로 이전

학습할 내용

  • GCE에서 자체 관리형 Kafka 클러스터를 설정하는 방법
  • Kafka 애플리케이션을 Pub/Sub 애플리케이션으로 마이그레이션하는 방법

필요한 항목

  • Google Cloud Platform에 액세스합니다 (BigQuery 및 Pub/Sub에 대한 쓰기 권한 포함).
  • gcloud CLI 설치됨
  • 자바 8+가 설치됨

비용

이 문서에서는 다음과 같은 청구 가능한 제품/서비스를 사용합니다.

예상 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요.

2. Kafka 설정

이 Codelab에서는 ZooKeeper를 사용하여 Kafka를 시작합니다. 로컬 환경에는 자바 8 이상이 설치되어 있어야 합니다.

1. Kafka 설치

Kafka를 다운로드하고 압축을 풉니다. 바이너리 다운로드를 따르도록 권장합니다.

curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar
-xzf kafka_2.13-3.5.1.tgz
cd kafka_2
.13-3.5.1

2. 동물원 사육사 시작

bin/zookeeper-server-start.sh config/zookeeper.properties

3. 브로커 시작

Kafka 브로커 서비스를 시작하려면 다른 터미널 세션을 열고 다음을 실행합니다.

bin/kafka-server-start.sh config/server.properties

4. Kafka 주제 만들기

Kafka 애플리케이션의 Kafka 주제를 만들고 새 터미널 세션을 연 후 다음을 실행합니다.

export TOPIC= "my-topic"
bin
/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092

5. 주제 만들기 확인

bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092

위 cmd의 출력은 다음과 유사합니다.

Topic: my-topic   TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1   ReplicationFactor: 1    Configs:
 
Topic: my-topic Partition: 0    Leader: 0   Replicas: 0 Isr: 0

3. Kafka 애플리케이션 만들기

이 Codelab에서는 프로듀서 1개와 소비자 2개가 있는 Java Kafka 애플리케이션을 만듭니다. 제작자는 주기적으로 임의의 문자열과 타임스탬프를 Kafka 주제에 전송합니다.

단계적 이전을 시연하기 위해 이 애플리케이션의 소비자 2개를 만들어 보겠습니다.

  • 소비자 1 - 읽은 메시지를 인쇄합니다.
  • 소비자 2 - BigQuery에 메시지 쓰기

새 터미널을 열고 다음 명령어를 실행합니다. Kafka 다운로드 디렉터리에서 다음 명령어를 실행하지 마세요.

1. 상수 변수 설정

export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"

2. Kafka 애플리케이션 src 다운로드

git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka
-to-pubsub-demo

3. gcloud 구성 및 인증

gcloud config set project $PROJECT_ID
gcloud auth application
-default login
gcloud services enable bigquery
.googleapis.com

4. BigQuery 테이블 만들기

이 테이블은 두 번째 소비자가 출력을 작성하는 데 사용됩니다. 테이블의 스키마 정의는 'message:STRING, timestamp:STRING'입니다.

bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID 
bq mk
--table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING

5. 프로듀서를 실행하여 주제로 메시지 전송을 시작합니다.

mvn clean install
mvn
exec:java \
 
-Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
 
-Dexec.args="$TOPIC"

출력 로그는 다음과 유사합니다.

...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...

6. 주제의 메시지를 콘솔에 로그아웃하는 첫 번째 소비자 실행

mvn clean install
mvn
exec:java \
 
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
 
-Dexec.args="$TOPIC"

출력 로그는 다음과 유사합니다.

...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...

7. kafka 주제의 메시지를 BigQuery 테이블에 쓰는 두 번째 소비자 실행

mvn clean install
mvn
exec:java \
 
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
 
-Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"

출력 로그는 다음과 유사합니다.

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

8. 메시지가 GCP 콘솔에서 BigQuery에 성공적으로 기록되는지 확인

8734b356c59543af.png

4. Pub/Sub 설정

1. Pub/Sub 사용 설정

gcloud services enable pubsub.googleapis.com

2. Pub/Sub 주제 만들기

최종적으로 이 주제가 Kafka 주제를 대체하게 됩니다. 편의상 kafka 주제와 동일한 이름을 사용할 수 있습니다.

export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC

5. 단계별 마이그레이션

이제 Kafka 애플리케이션을 설정했고 마이그레이션을 위한 Pub/Sub 주제가 준비되었으므로 Kafka에서 Pub/Sub로 마이그레이션을 진행해 보겠습니다.

이 마이그레이션 데모에서는 Kafka 인프라를 단계적으로 마이그레이션할 수 있도록 지원하는 Google Cloud Pub/Sub 그룹의 Pub/Sub Kafka 커넥터를 사용합니다.

1단계

모든 메시지를 Kafka 주제에서 Pub/Sub 주제로 전달하도록 Pub/Sub 커넥터 구성

1. 커넥터 저장소를 빌드하여 kafka-to-pubsub 커넥터 jar 가져오기

git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java
-pubsub-group-kafka-connector/
mvn clean
package -DskipTests=True

성공하면 target/pubsub-group-kafka-connector-${VERSION}.jar에 결과 jar가 표시됩니다.

jar의 전체 경로로 변수를 만듭니다.

export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"

2. Kafka Connect 구성으로 설치된 Kafka 구성 업데이트하기

디렉터리를 이전 kafka 다운로드 폴더로 변경합니다.

cd kafka_2.13-3.5.1

Kafka 다운로드 폴더에서 /config/connect-standalone.properties를 열고 다운로드한 커넥터 jar의 파일 경로를 plugin.path에 추가하고 필요한 경우 줄의 주석 처리를 삭제합니다. 또는 아래 cmd를 실행할 수도 있습니다.

echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties

3. 이전에 필요한 kafka 주제, Pub/Sub 프로젝트, Pub/Sub 주제로 CloudPubSubSinkConnector 구성 파일을 만듭니다. 여기에서 CloudPubSubSinkConnector 구성 파일 샘플을 확인하세요.

cat <<EOF > config/cps-sink-connector.properties
name
=CPSSinkConnector
connector
.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks
.max=10
key
.converter=org.apache.kafka.connect.storage.StringConverter
value
.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics
=$TOPIC
cps
.project=$PROJECT_ID
cps
.topic=$TOPIC
EOF

4. 커넥터를 시작하여 Kafka 주제에서 Pub/Sub로 메시지 전달 시작하기

bin/connect-standalone.sh \
config
/connect-standalone.properties \
config
/cps-sink-connector.properties

GCP 콘솔에서 메시지가 Pub/Sub 주제로 전달되는지 확인

2단계

제작자가 Kafka에 메시지를 계속 게시하는 동안 Pub/Sub 주제의 메시지를 수신하도록 소비자 애플리케이션을 업데이트합니다.

1. 콘솔에 메시지를 출력하는 소비자를 업데이트하여 Pub/Sub를 구독합니다. 샘플 kafka-to-pubsub-demo src에서 SimplePubsubscriber1Pub/Sub 주제에서 읽도록 업데이트됩니다.

Pub/Sub 정기 결제 만들기

export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID
--topic=$TOPIC

업데이트된 구독자 애플리케이션 실행

cd kafka-to-pubsub-demo
mvn
exec:java \
 
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
 
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"

출력 로그는 다음과 유사합니다.

...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...

2. BigQuery에 작성하는 소비자를 업데이트하여 Pub/Sub를 구독합니다. 샘플 kafka-to-pubsub-demo src에서 SimplePubsubscriber1Pub/Sub 주제에서 읽도록 업데이트됩니다.

Pub/Sub 정기 결제 만들기

export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID
--topic=$TOPIC

업데이트된 구독자 애플리케이션 실행

cd kafka-to-pubsub-demo
mvn
exec:java \
 
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
 
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"

출력 로그는 다음과 유사합니다.

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

3단계

제작자를 업데이트하여 Pub/Sub에 직접 게시

  1. Kafka 대신 Pub/Sub에 쓰도록 Kafka 제작자 src를 업데이트합니다. 샘플 kafka-to-pubsub-demo src에서는 Pub/Sub 주제로 메시지를 보내도록 SimplePubsubPublisher가 업데이트됩니다.
  2. 커넥터를 중지합니다. kafka-connect 터미널 세션에서 실행 중인 커넥터를 종료하여 커넥터를 중지할 수 있습니다.
  3. 업데이트된 게시자 애플리케이션 실행
cd kafka-to-pubsub-demo
mvn
exec:java \
 
-Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
 
-Dexec.args="$PROJECT_ID $TOPIC"

6. 축하합니다

축하합니다. 자체 관리형 Kafka 애플리케이션을 Pub/Sub로 마이그레이션하는 Codelab을 완료했습니다.

자세한 내용은 다음 링크를 참고하세요.