העברה מ-Apache Kafka ל-Pubsub

1. מבוא

ה-Codelab הזה הוא מדריך מפורט להדגמת העברה של אפליקציות מ-Apache Kafka ל-Google Cloud Pubsub באמצעות גישת Phased Migration.

כאן אפשר לקרוא מידע נוסף על ההבדלים בין Kafka ו-Pubsub לבין הגישה של Pased Migration.

מה תפַתחו

בהדגמה הזו:

  • הגדרה של אשכול קפקא בניהול עצמי ב-GCE
  • פריסת אפליקציית Kafka פשוטה שמשדרת בסטרימינג מחרוזות אקראיות
  • הגדרת Pub/Sub
  • עוברים מ-Kafka ל-Pubsub באמצעות מחבר Pub/Sub Kafka

מה תלמדו

  • איך להגדיר אשכול קפקא בניהול עצמי ב-GCE
  • איך מעבירים אפליקציית Kafka לאפליקציית Pub/Sub

למה תזדקק?

  • גישה ל-Google Cloud Platform (עם הרשאות כתיבה ל-BigQuery ול-Pub/Sub).
  • ה-CLI של gcloud הותקן.
  • התקנת Java 8 ומעלה.

עלות

במסמך הזה תשתמשו במוצרים או בשירותים הבאים, והשימוש בהם כרוך בתשלום:

תוכלו להשתמש במחשבון התמחור כדי ליצור הערכת עלויות בהתאם לשימוש החזוי.

2. הגדרת Kafka

ב-Codelab הזה, נתחיל את Kafka באמצעות ZoomKeeper. חובה להתקין בסביבה המקומית שלכם גרסה 8 ומעלה של Java.

1. התקנת Kafka

מורידים את Kafka ומחלצים אותו. המלץ על ההורדה הבינארית כדי לפעול לפי השלבים הבאים:

curl -O https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1

2. הפעלת שמירה בגן חיות

bin/zookeeper-server-start.sh config/zookeeper.properties

3. התחלת המתווך

כדי להפעיל את שירות המתווך של Kafka, פותחים סשן נוסף בטרמינל ומריצים את הפקודה:

bin/kafka-server-start.sh config/server.properties

4. יצירת נושא קפקא

יוצרים נושא Kafka עבור אפליקציית Kafka, פותחים סשן חדש בטרמינל ומריצים את הפקודה:

export TOPIC= "my-topic"
bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092

5. אישור יצירת הנושא

bin/kafka-topics.sh --describe --topic $TOPIC --bootstrap-server localhost:9092

הפלט של ה-cmd שלמעלה אמור להיראות כך:

Topic: my-topic   TopicId: gf4ena9rSmyQXMTDj1bBmQ PartitionCount: 1   ReplicationFactor: 1    Configs:
  Topic: my-topic Partition: 0    Leader: 0   Replicas: 0 Isr: 0

3. יצירת אפליקציית Kafka

ב-Codelab הזה ניצור אפליקציית Java kafka שיש לה מפיק אחד ו-2 צרכנים. מדי פעם המפיק שולח מחרוזות אקראיות וחותמת זמן לנושא קפקא.

כדי להמחיש העברה הדרגתית, ניצור שני צרכנים לאפליקציה הזו.

  • צרכן 1 – מדפיס את ההודעות שנקראו
  • צרכן 2 – כותב ההודעות ב-BigQuery

פותחים טרמינל חדש ומריצים את הפקודות הבאות. לא להריץ את הפקודות האלה בספריית ההורדות של Kafka

1. הגדרת משתנים קבועים

export PROJECT_ID="<your project id>"
export DATASET_ID="<dataset name>"
export TABLE_ID="<table name>"
export TOPIC="my-topic"

2. הורדת אפליקציית Kafka src

git clone https://github.com/itodotimothy6/kafka-to-pubsub-demo.git
cd kafka-to-pubsub-demo

3. הגדרה ואימות של gcloud

gcloud config set project $PROJECT_ID
gcloud auth application-default login
gcloud services enable bigquery.googleapis.com

4. יצירת טבלה ב-BigQuery

הטבלה הזו משמשת את הצרכן השני כדי לכתוב את הפלט. הגדרת הסכימה של הטבלה היא "message:STRING, timestamp:STRING".

bq mk --dataset --data_location US $PROJECT_ID:$DATASET_ID 
bq mk --table $PROJECT_ID:$DATASET_ID.$TABLE_ID message:STRING,timestamp:STRING

5. צריך להפעיל את המפיק כדי להתחיל לשלוח הודעות לנושא

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaProducer" \
  -Dexec.args="$TOPIC"

יומני הפלט צריכים להיראות דומים ל:

...
Message sent: {"message":"283b7961-44cd-46d4-9061-5a22b8a1bdd7","timestamp":"2023-09-15 12:17:09"}
Message sent: {"message":"e4c2110a-ebbe-4c96-88d1-56ffdc2a3e9a","timestamp":"2023-09-15 12:17:14"}
...

6. הרצת הצרכן הראשון שמנתק הודעות בנושא במסוף

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaConsumer1" \
  -Dexec.args="$TOPIC"

יומני הפלט צריכים להיראות דומים ל:

...
Received message: {"message":"72d46b42-5014-4d28-a6e3-04b65de63826","timestamp":"2023-09-15 12:32:47"}
Received message: {"message":"631464dc-2651-4cce-826f-c9442beb3e98","timestamp":"2023-09-15 12:32:52"}
...

7. מריצים את הצרכן השני שכותב הודעות מנושא kafka לטבלת BigQuery

mvn clean install
mvn exec:java \
  -Dexec.mainClass="org.kafka.SimpleKafkaConsumer2" \
  -Dexec.args="$TOPIC $PROJECT_ID $DATASET_ID $TABLE_ID"

יומני הפלט צריכים להיראות דומים ל:

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

8. בדיקה שההודעות נכתבות בהצלחה ל-BigQuery במסוף GCP

8734b356c59543af.png

4. הגדרת Pubsub

1. הפעלת Pubsub

gcloud services enable pubsub.googleapis.com

2. יצירת נושא Pubsub

נושא זה יחליף בסופו של דבר את נושא הקפקא. כדי לשמור על פשטות, אפשר להשתמש באותו שם כמו שם הנושא kafka.

export TOPIC = "my-topic"
gcloud pubsub topics create $TOPIC

5. העברה הדרגתית

עכשיו, אחרי שהגדרנו את אפליקציית Kafka ויצרנו נושא Pub/Sub להעברה, נמשיך בתהליך ההעברה מ-Kafka ל-Pub/Sub.

בהדגמה הזו של ההעברה נשתמש במחבר Pub/Sub Kafka של קבוצת Google Cloud Pub/Sub Group, שמאפשר להעביר את תשתית kafka בשלבים.

שלב 1

הגדרת מחבר Pub/Sub להעברת כל ההודעות מנושא Kafka לנושא Pub/Sub

1. קבלת הצנצנת מחבר מסוג kafka-to-pubsub על ידי פיתוח מאגר המחברים

git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
cd java-pubsub-group-kafka-connector/
mvn clean package -DskipTests=True

הצנצנת אמורה להופיע ב-target/pubsub-group-kafka-connector-${VERSION}.jar בהצלחה.

יוצרים משתנה עם הנתיב המלא לצנצנת.

export KAFKA_CONNECT_JAR="path/to/target/pubsub-group-kafka-connector-${VERSION}.jar"

2. עדכון ההגדרות האישיות של Kafka באמצעות ההגדרות של Kafka Connect

שינוי הספרייה לתיקיית ההורדות של kafka הקודמת

cd kafka_2.13-3.5.1

פותחים את /config/connect-standalone.properties בתיקיית ההורדות של Kafka, ומוסיפים את נתיב הקובץ של מאגר המחברים שהורדתם אל Plugin.path ומבטלים את הוספת התגובה לשורה במקרה הצורך. לחלופין, אפשר להריץ את ה-CMd הבא

echo "plugin.path=$KAFKA_CONNECT_JAR" >> config/connect-standalone.properties

3. יוצרים CloudPubSubSinkConnector קובץ תצורה עם הנושא kafka, פרויקט pubsub ונושא pubsub שדרושים להעברה. אפשר לראות דוגמה CloudPubSubSinkConnector לקובץ תצורה כאן.

cat <<EOF > config/cps-sink-connector.properties
name=CPSSinkConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
topics=$TOPIC
cps.project=$PROJECT_ID
cps.topic=$TOPIC
EOF

4. יש להפעיל את המחבר כדי להתחיל להעביר הודעות מנושא Kafka ל-Pub/Sub

bin/connect-standalone.sh \
config/connect-standalone.properties \
config/cps-sink-connector.properties

במסוף GCP, מוודאים שההודעות מועברות לנושא Pub/Sub

שלב 2

צריך לעדכן את האפליקציות לצרכנים כדי לקבל הודעות מנושא Pub/Sub, בזמן שהמפיק ממשיך לפרסם הודעות ל-Kafka

1. לעדכן את הצרכן שמדפיס הודעות למסוף כדי להירשם ל-Pub/Sub. בדוגמה הבאה, kafka-to-pubsub-demo src, SimplePubsubscriber1 מעודכן לקריאה מהנושא Pubsub.

יצירת מינוי ל-Pub/Sub

export SUBSCRIPTION_ID="sub1"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC

הפעלה של אפליקציית מנויים מעודכנת

cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubSubscriber1" \
  -Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID"

יומני הפלט צריכים להיות דומים ל-

...
Id: 8827699929893588
Data: {"message":"08afe1db-2ace-466d-bcf9-77ffc80a7f58","timestamp":"2023-09-15 15:57:34"}
Id: 8827853608001203
Data: {"message":"557960f7-5f2e-4156-84de-e270127c99de","timestamp":"2023-09-15 15:57:39"}
...

2. מעדכנים את הצרכן שכותב ל-BigQuery להירשם ל-Pub/Sub. בדוגמה הבאה, kafka-to-pubsub-demo src, SimplePubsubscriber1 מעודכן לקריאה מהנושא Pubsub.

יצירת מינוי ל-Pub/Sub

export SUBSCRIPTION_ID="sub2"
gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC

הפעלה של אפליקציית מנויים מעודכנת

cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubSubscriber2" \
  -Dexec.args="$PROJECT_ID $SUBSCRIPTION_ID $DATASET_ID $TABLE_ID"

יומני הפלט צריכים להיות דומים ל-

...
Message inserted to BigQuery successfully.
Message inserted to BigQuery successfully.
...

שלב 3

עדכון המפיקים כדי לפרסם ישירות ל-Pub/Sub

  1. מעדכנים את Kafka Producer src כך שיכתבו ל-Pub/Sub במקום ל-Kafka. בדוגמת kafka-to-pubsub-demo src, מתבצע עדכון של SimplePubsubPublisher כך שישלח הודעות לנושא Pubsub.
  2. עוצרים את המחבר. אפשר להפסיק את המחבר על ידי סיום המחבר הפעיל בסשן הטרמינל של kafka-connect
  3. הפעלת האפליקציה המעודכנת של בעל התוכן הדיגיטלי
cd kafka-to-pubsub-demo
mvn exec:java \
  -Dexec.mainClass="org.pubsub.SimplePubsubPublisher" \
  -Dexec.args="$PROJECT_ID $TOPIC"

6. מזל טוב

כל הכבוד, השלמת בהצלחה את ה-Codelab בנושא העברת אפליקציות קפקא בניהול עצמי ל-Pub/Sub.

הנה כמה קישורים למידע נוסף