이 Codelab 정보
1. 소개
이 Codelab은 단계적 이전 방식을 사용하여 Apache Kafka에서 Google Cloud Pubsub로 애플리케이션을 이전하는 방법을 보여주는 단계별 안내입니다.
Kafka와 Pubsub의 차이점과 단계적 마이그레이션 접근 방식의 차이점은 여기에서 확인할 수 있습니다.
빌드할 항목
이 데모에서 수행할 작업은 다음과 같습니다.
- GCE에서 자체 관리형 Kafka 클러스터 설정
- 임의의 문자열을 스트리밍하는 간단한 Kafka 애플리케이션 배포
- Pub/Sub 설정
- Pub/Sub Kafka 커넥터를 사용하여 Kafka에서 Pubsub로 이전
학습할 내용
- GCE에서 자체 관리형 Kafka 클러스터를 설정하는 방법
- Kafka 애플리케이션을 Pub/Sub 애플리케이션으로 마이그레이션하는 방법
필요한 항목
- Google Cloud Platform에 액세스합니다 (BigQuery 및 Pub/Sub에 대한 쓰기 권한 포함).
- gcloud CLI 설치됨
- 자바 8+가 설치됨
비용
이 문서에서는 다음과 같은 청구 가능한 제품/서비스를 사용합니다.
예상 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요.
2. Kafka 설정
이 Codelab에서는 ZooKeeper를 사용하여 Kafka를 시작합니다. 로컬 환경에는 자바 8 이상이 설치되어 있어야 합니다.
1. Kafka 설치
Kafka를 다운로드하고 압축을 풉니다. 바이너리 다운로드를 따르도록 권장합니다.
curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1
2. 동물원 사육사 시작
bin/zookeeper-server-start.sh config/zookeeper.properties
3. 브로커 시작
Kafka 브로커 서비스를 시작하려면 다른 터미널 세션을 열고 다음을 실행합니다.
bin/kafka-server-start.sh config/server.properties
4. Kafka 주제 만들기
Kafka 애플리케이션의 Kafka 주제를 만들고 새 터미널 세션을 연 후 다음을 실행합니다.
export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092
5. 주제 만들기 확인
bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092
위 cmd의 출력은 다음과 유사합니다.
Topic: my-topic TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
3. Kafka 애플리케이션 만들기
이 Codelab에서는 프로듀서 1개와 소비자 2개가 있는 Java Kafka 애플리케이션을 만듭니다. 제작자는 주기적으로 임의의 문자열과 타임스탬프를 Kafka 주제에 전송합니다.
단계적 이전을 시연하기 위해 이 애플리케이션의 소비자 2개를 만들어 보겠습니다.
- 소비자 1 - 읽은 메시지를 인쇄합니다.
- 소비자 2 - BigQuery에 메시지 쓰기
새 터미널을 열고 다음 명령어를 실행합니다. Kafka 다운로드 디렉터리에서 다음 명령어를 실행하지 마세요.
1. 상수 변수 설정
export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"
2. Kafka 애플리케이션 src 다운로드
git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka-to-pubsub-demo
3. gcloud 구성 및 인증
gcloud config set project $PROJECT_ID
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com
4. BigQuery 테이블 만들기
이 테이블은 두 번째 소비자가 출력을 작성하는 데 사용됩니다. 테이블의 스키마 정의는 'message:STRING, timestamp:STRING'입니다.
bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID
bq mk --table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING
5. 프로듀서를 실행하여 주제로 메시지 전송을 시작합니다.
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
-Dexec.args="$TOPIC"
출력 로그는 다음과 유사합니다.
...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...
6. 주제의 메시지를 콘솔에 로그아웃하는 첫 번째 소비자 실행
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
-Dexec.args="$TOPIC"
출력 로그는 다음과 유사합니다.
...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...
7. kafka 주제의 메시지를 BigQuery 테이블에 쓰는 두 번째 소비자 실행
mvn clean install
mvn exec:java \
-Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
-Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"
출력 로그는 다음과 유사합니다.
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
8. 메시지가 GCP 콘솔에서 BigQuery에 성공적으로 기록되는지 확인
4. Pub/Sub 설정
1. Pub/Sub 사용 설정
gcloud services enable pubsub.googleapis.com
2. Pub/Sub 주제 만들기
최종적으로 이 주제가 Kafka 주제를 대체하게 됩니다. 편의상 kafka 주제와 동일한 이름을 사용할 수 있습니다.
export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC
5. 단계별 마이그레이션
이제 Kafka 애플리케이션을 설정했고 마이그레이션을 위한 Pub/Sub 주제가 준비되었으므로 Kafka에서 Pub/Sub로 마이그레이션을 진행해 보겠습니다.
이 마이그레이션 데모에서는 Kafka 인프라를 단계적으로 마이그레이션할 수 있도록 지원하는 Google Cloud Pub/Sub 그룹의 Pub/Sub Kafka 커넥터를 사용합니다.
1단계
모든 메시지를 Kafka 주제에서 Pub/Sub 주제로 전달하도록 Pub/Sub 커넥터 구성
1. 커넥터 저장소를 빌드하여 kafka-to-pubsub 커넥터 jar 가져오기
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True
성공하면 target/pubsub-group-kafka-connector-${VERSION}.jar
에 결과 jar가 표시됩니다.
jar의 전체 경로로 변수를 만듭니다.
export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"
2. Kafka Connect 구성으로 설치된 Kafka 구성 업데이트하기
디렉터리를 이전 kafka 다운로드 폴더로 변경합니다.
cd kafka_2.13-3.5.1
Kafka 다운로드 폴더에서 /config/connect-standalone.properties
를 열고 다운로드한 커넥터 jar의 파일 경로를 plugin.path에 추가하고 필요한 경우 줄의 주석 처리를 삭제합니다. 또는 아래 cmd를 실행할 수도 있습니다.
echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties
3. 이전에 필요한 kafka 주제, Pub/Sub 프로젝트, Pub/Sub 주제로 CloudPubSubSinkConnector
구성 파일을 만듭니다. 여기에서 CloudPubSubSinkConnector
구성 파일 샘플을 확인하세요.
cat <<EOF > config/cps-sink-connector.properties
name=CPSSinkConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=$TOPIC
cps.project=$PROJECT_ID
cps.topic=$TOPIC
EOF
4. 커넥터를 시작하여 Kafka 주제에서 Pub/Sub로 메시지 전달 시작하기
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties
GCP 콘솔에서 메시지가 Pub/Sub 주제로 전달되는지 확인
2단계
제작자가 Kafka에 메시지를 계속 게시하는 동안 Pub/Sub 주제의 메시지를 수신하도록 소비자 애플리케이션을 업데이트합니다.
1. 콘솔에 메시지를 출력하는 소비자를 업데이트하여 Pub/Sub를 구독합니다. 샘플 kafka-to-pubsub-demo
src에서 SimplePubsubscriber1
는 Pub/Sub 주제에서 읽도록 업데이트됩니다.
Pub/Sub 정기 결제 만들기
export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
업데이트된 구독자 애플리케이션 실행
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"
출력 로그는 다음과 유사합니다.
...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...
2. BigQuery에 작성하는 소비자를 업데이트하여 Pub/Sub를 구독합니다. 샘플 kafka-to-pubsub-demo
src에서 SimplePubsubscriber1
는 Pub/Sub 주제에서 읽도록 업데이트됩니다.
Pub/Sub 정기 결제 만들기
export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC
업데이트된 구독자 애플리케이션 실행
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
-Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"
출력 로그는 다음과 유사합니다.
...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...
3단계
제작자를 업데이트하여 Pub/Sub에 직접 게시
- Kafka 대신 Pub/Sub에 쓰도록 Kafka 제작자 src를 업데이트합니다. 샘플
kafka-to-pubsub-demo
src에서는 Pub/Sub 주제로 메시지를 보내도록SimplePubsubPublisher
가 업데이트됩니다. - 커넥터를 중지합니다. kafka-connect 터미널 세션에서 실행 중인 커넥터를 종료하여 커넥터를 중지할 수 있습니다.
- 업데이트된 게시자 애플리케이션 실행
cd kafka-to-pubsub-demo
mvn exec:java \
-Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
-Dexec.args="$PROJECT_ID $TOPIC"