การประมวลผลข้อมูล IoT แบบเรียลไทม์โดยใช้ Dataproc และบริการที่มีการจัดการของ Google สำหรับ Apache Kafka
เกี่ยวกับ Codelab นี้
1 บทนำ
อัปเดตล่าสุด: 10-06-2024
ความเป็นมา
อุปกรณ์อินเทอร์เน็ตของสรรพสิ่ง (IoT) ตั้งแต่โซลูชันสมาร์ทโฮมไปจนถึงเซ็นเซอร์อุตสาหกรรมจะสร้างข้อมูลจำนวนมากที่ขอบเครือข่าย ข้อมูลนี้มีประโยชน์อย่างยิ่งสำหรับกรณีการใช้งานที่หลากหลาย เช่น การตรวจสอบอุปกรณ์ การติดตาม การวินิจฉัย การเฝ้าระวัง การปรับให้เหมาะกับผู้ใช้แต่ละราย การเพิ่มประสิทธิภาพของกลุ่มอุปกรณ์ และอื่นๆ อีกมากมาย บริการที่มีการจัดการของ Google สำหรับ Apache Kafka นำเสนอวิธีรับและจัดเก็บสตรีมข้อมูลที่ต่อเนื่องนี้อย่างยืดหยุ่นและยั่งยืนในลักษณะที่ใช้งานง่าย ปลอดภัย และเข้ากันได้กับ OSS ขณะที่ Google Cloud Dataproc ช่วยให้ประมวลผลชุดข้อมูลขนาดใหญ่เหล่านี้สําหรับการวิเคราะห์ข้อมูลได้โดยใช้คลัสเตอร์ Apache Spark และ Hadoop
สิ่งที่คุณจะสร้าง
ในโค้ดแล็บนี้ คุณจะได้สร้างไปป์ไลน์การประมวลผลข้อมูล IoT โดยใช้บริการที่มีการจัดการของ Google สำหรับ Apache Kafka, Dataproc, Python และ Apache Spark ซึ่งทําการวิเคราะห์แบบเรียลไทม์ ไปป์ไลน์จะทําดังนี้
- เผยแพร่ข้อมูลจากอุปกรณ์ IoT ไปยังคลัสเตอร์ Kafka ที่มีการจัดการโดยใช้ GCE VM
- สตรีมข้อมูลจากคลัสเตอร์ Manage Kafka ไปยังคลัสเตอร์ Dataproc
- ประมวลผลข้อมูลโดยใช้งาน Dataproc Spark Streaming
สิ่งที่คุณจะได้เรียนรู้
- วิธีสร้างคลัสเตอร์ Kafka และ Dataproc ที่ Google จัดการ
- วิธีเรียกใช้งานสตรีมมิงโดยใช้ Dataproc
สิ่งที่ต้องมี
- บัญชี GCP ที่ใช้งานอยู่ซึ่งตั้งค่าโปรเจ็กต์ไว้แล้ว หากยังไม่มีบัญชี คุณสามารถลงชื่อสมัครช่วงทดลองใช้ฟรีได้
- ติดตั้งและกำหนดค่า gcloud CLI แล้ว คุณทําตามวิธีการติดตั้ง gcloud CLI ในระบบปฏิบัติการได้
- API ที่เปิดใช้สำหรับ Google Managed Kafka และ Dataproc ในโปรเจ็กต์ GCP
2 ภาพรวม
ในโค้ดแล็บนี้ เราจะติดตามเรื่องราวของบริษัทสมมติชื่อ DemoIOT Solutions DemoIOT Solutions มีอุปกรณ์เซ็นเซอร์ที่วัดและส่งข้อมูลอุณหภูมิ ความชื้น ความดัน ระดับแสง และตำแหน่ง ลูกค้าต้องการตั้งค่าไปป์ไลน์ที่ประมวลผลข้อมูลนี้เพื่อแสดงสถิติแบบเรียลไทม์แก่ลูกค้า การใช้ไปป์ไลน์ดังกล่าวช่วยให้บริษัทให้บริการที่หลากหลายแก่ลูกค้าได้ เช่น การตรวจสอบ คำแนะนำอัตโนมัติ การแจ้งเตือน และข้อมูลเชิงลึกเกี่ยวกับสถานที่ที่ลูกค้าติดตั้งเซ็นเซอร์
โดยเราจะใช้ GCE VM เพื่อจำลองอุปกรณ์ IoT อุปกรณ์จะเผยแพร่ข้อมูลไปยังหัวข้อ Kafka ในคลัสเตอร์ Kafka ที่ Google จัดการ ซึ่งจะอ่านและประมวลผลโดยงานสตรีมมิง Dataproc การตั้งค่าข้อกําหนดเบื้องต้นและหน้าต่อไปนี้จะนำคุณไปยังขั้นตอนเหล่านี้ทั้งหมด
การตั้งค่าเบื้องต้น
- ค้นหาชื่อและหมายเลขโปรเจ็กต์ โปรดดูข้อมูลอ้างอิงจากค้นหาชื่อ หมายเลข และรหัสโปรเจ็กต์
- ซับเน็ต VPC ซึ่งจะช่วยให้สามารถเชื่อมต่อระหว่าง GCE VM, คลัสเตอร์ Kafka และคลัสเตอร์ Dataproc ได้ ทำตามขั้นตอนนี้เพื่อแสดงรายการซับเน็ตที่มีอยู่โดยใช้ gcloud CLI หากจำเป็น ให้ทําตามหัวข้อสร้างเครือข่าย VPC โหมดอัตโนมัติ ซึ่งจะสร้างเครือข่าย VPC ที่มีเครือข่ายย่อยในแต่ละภูมิภาคของ Google Cloud แต่สําหรับวัตถุประสงค์ของโค้ดแล็บนี้ เราจะใช้เครือข่ายย่อยจากภูมิภาคเดียวเท่านั้น
- ในซับเน็ตนี้ ให้ตรวจสอบว่ามีกฎไฟร์วอลล์ที่อนุญาตให้ใช้ข้อมูลขาเข้าทั้งหมดจาก tcp:22 ซึ่งเป็น SSH ที่จำเป็น กฎนี้จะเลือกได้ในส่วนกฎไฟร์วอลล์เมื่อสร้างเครือข่าย ดังนั้นโปรดเลือกกฎนี้
- ที่เก็บข้อมูล GCS คุณจะต้องเข้าถึงที่เก็บข้อมูล Google Cloud Storage เพื่อจัดเก็บทรัพยากรงาน Dataproc และเก็บข้อมูลที่ประมวลผลไว้ หากยังไม่มี คุณสามารถสร้างในโปรเจ็กต์ GCP
ป้อนข้อมูลตัวแปรสภาพแวดล้อม
ในเทอร์มินัลที่คุณเรียกใช้ gcloud CLI ให้ป้อนข้อมูลตัวแปรสภาพแวดล้อมเหล่านี้เพื่อให้ใช้ภายหลังได้
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
แทนที่ค่าต่อไปนี้
<project-id>
ที่มีชื่อโปรเจ็กต์ GCP ที่คุณตั้งค่า<project-number>
ด้วยชื่อหมายเลขโปรเจ็กต์จากขั้นตอนก่อนการเริ่มต้นที่ 1<region>
ที่มีชื่อภูมิภาคจากภูมิภาคและโซนที่ใช้ได้ที่คุณต้องการใช้ เช่น เราสามารถใช้us-central1
<zone>
ที่มีชื่อโซนจากภูมิภาคและโซนที่ใช้ได้ในส่วนภูมิภาคที่คุณเลือกไว้ก่อนหน้านี้ เช่น หากเลือกus-central1
เป็นภูมิภาค คุณจะใช้us-central1-f
เป็นโซนได้ ระบบจะใช้โซนนี้เพื่อสร้าง GCE VM ที่จำลองอุปกรณ์ IoT ตรวจสอบว่าเขตของคุณอยู่ในภูมิภาคที่คุณเลือก<subnet-path>
พร้อมเส้นทางแบบเต็มของซับเน็ตจากขั้นตอนที่ 2 ของข้อกําหนดเบื้องต้น ค่าของส่วนนี้ต้องอยู่ในรูปแบบprojects/<project-id>/regions/<region>/subnetworks/<subnet-name>
<bucket-name>
ที่มีชื่อของที่เก็บข้อมูล GCS จากขั้นตอนก่อนการเริ่มต้นใช้งานที่ 3
3 ตั้งค่า Kafka ที่มีการจัดการของ Google
ส่วนนี้จะตั้งค่าคลัสเตอร์ Kafka ที่ Google จัดการ ซึ่งจะติดตั้งใช้งานเซิร์ฟเวอร์ Kafka และสร้างหัวข้อในเซิร์ฟเวอร์นี้เพื่อเผยแพร่และอ่านข้อมูล IoT หลังจากสมัครใช้บริการ DemoIOT Solutions สามารถตั้งค่าคลัสเตอร์นี้เพื่อให้อุปกรณ์ทั้งหมดเผยแพร่ข้อมูลไปยังคลัสเตอร์ได้
สร้างคลัสเตอร์ Kafka ที่มีการจัดการ
- สร้างคลัสเตอร์ Kafka ที่มีการจัดการ ในที่นี้ ชื่อคลัสเตอร์คือ
kafka-iot
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
คุณควรได้รับการตอบกลับที่คล้ายกับข้อความต่อไปนี้
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
การสร้างคลัสเตอร์จะใช้เวลา 20-30 นาที โปรดรอให้การดำเนินการนี้เสร็จสมบูรณ์
สร้างหัวข้อ
- สร้างหัวข้อ Kafka ที่มีการจัดการในคลัสเตอร์ ในที่นี้ ชื่อหัวข้อคือ
kafka-iot-topic
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
คุณควรเห็นเอาต์พุตที่คล้ายกับต่อไปนี้
Created topic [kafka-iot-topic].
4 ตั้งค่าผู้เผยแพร่โฆษณา
หากต้องการเผยแพร่ไปยังคลัสเตอร์ Kafka ที่มีการจัดการ เราจะตั้งค่าอินสแตนซ์ VM ของ Google Compute Engine ที่เข้าถึง VPC ซึ่งมีซับเน็ตที่คลัสเตอร์ Kafka ที่มีการจัดการใช้ VM นี้จะจำลองอุปกรณ์เซ็นเซอร์ที่ให้บริการโดย DemoIOT Solutions
ขั้นตอน
- สร้างอินสแตนซ์ VM ของ Google Compute Engine ในที่นี้ ชื่อของ GCE VM คือ
publisher-instance
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- ให้สิทธิ์บัญชีบริการเริ่มต้นของ Google Compute Engine เพื่อใช้บริการที่มีการจัดการสำหรับ Apache Kafka
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- ใช้ SSH เพื่อเชื่อมต่อกับ VM หรือใช้คอนโซล Google Cloud เพื่อ SSH
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- ติดตั้ง Java เพื่อเรียกใช้เครื่องมือบรรทัดคำสั่งของ Kafka และดาวน์โหลดไบนารีของ Kafka โดยใช้คําสั่งเหล่านี้
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- ดาวน์โหลดไลบรารีการตรวจสอบสิทธิ์ Kafka ที่มีการจัดการและทรัพยากร Dependency ของไลบรารี รวมถึงกำหนดค่าพร็อพเพอร์ตี้ไคลเอ็นต์ Kafka
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
ดูรายละเอียดเพิ่มเติมเกี่ยวกับการตั้งค่าเครื่องของผู้เผยแพร่โฆษณาได้ที่ตั้งค่าเครื่องไคลเอ็นต์
5 เผยแพร่ไปยัง Kafka ที่มีการจัดการ
เมื่อตั้งค่าผู้เผยแพร่แล้ว เราสามารถใช้บรรทัดคำสั่ง Kafka ในการเผยแพร่ข้อมูลจำลองบางส่วนจาก VM ของ GCE (การจำลองอุปกรณ์ IoT โดย DemoIOT Solutions) ไปยังคลัสเตอร์ Kafka ที่มีการจัดการ
- เนื่องจากเราได้ SSH เข้าสู่อินสแตนซ์ VM ของ GCE แล้ว เราจึงต้องป้อนข้อมูลตัวแปร
PROJECT_ID
อีกครั้ง โดยทำดังนี้
export PROJECT_ID=<project-id>
export REGION=<region>
แทนที่ค่าต่อไปนี้
<project-id>
ที่มีชื่อโปรเจ็กต์ GCP ที่คุณตั้งค่า<region>
ที่มีภูมิภาคที่สร้างคลัสเตอร์ Kafka
- ใช้คำสั่ง
managed-kafka clusters describe
เพื่อรับที่อยู่ IP ของเซิร์ฟเวอร์บูตสตับ Kafka คุณใช้ที่อยู่นี้เพื่อเชื่อมต่อกับคลัสเตอร์ Kafka ได้
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- แสดงรายการหัวข้อในกลุ่ม
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
คุณควรเห็นเอาต์พุตต่อไปนี้ซึ่งมีหัวข้อ kafka-iot-topic
ที่เราสร้างขึ้นก่อนหน้านี้
__remote_log_metadata
kafka-iot-topic
- คัดลอกและวางสคริปต์นี้ลงในไฟล์ใหม่
publish_iot_data.sh
หากต้องการสร้างไฟล์ใหม่ใน GCE VM คุณสามารถใช้เครื่องมืออย่างvim
หรือnano
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
คำอธิบาย
- สคริปต์นี้จะสร้างข้อความ JSON ที่มีค่าที่อ่านได้จากเซ็นเซอร์จำลองซึ่งมีรหัสอุปกรณ์ การประทับเวลา ข้อมูลเซ็นเซอร์ (อุณหภูมิ ความชื้น ความดัน แสง) ข้อมูลตำแหน่ง (ละติจูด ลองจิจูด) สถานะอุปกรณ์ (แบตเตอรี่ สัญญาณ ประเภทการเชื่อมต่อ) และข้อมูลเมตาบางอย่าง
- โดยจะสร้างกระแสข้อความอย่างต่อเนื่องจากอุปกรณ์ที่ไม่ซ้ำกันตามจำนวนที่กำหนด โดยแต่ละอุปกรณ์จะส่งข้อมูลในช่วงเวลาที่ระบุ ซึ่งจำลองอุปกรณ์ IoT ในชีวิตจริง ในส่วนนี้ เราจะเผยแพร่ข้อมูลจากอุปกรณ์ 10 เครื่องที่อ่านค่าได้ 20 ครั้งในแต่ละครั้ง โดยเว้นช่วงเวลา 10 วินาที
- รวมถึงเผยแพร่ข้อมูลที่สร้างขึ้นทั้งหมดไปยังหัวข้อ Kafka โดยใช้เครื่องมือบรรทัดคำสั่งของ Kafka Producer
- ติดตั้งข้อกําหนดเบื้องต้นบางอย่างที่ใช้โดยสคริปต์ เช่น แพ็กเกจ
bc
สําหรับการคํานวณทางคณิตศาสตร์ และแพ็กเกจjq
สําหรับการประมวลผล JSON
sudo apt-get install bc jq
- แก้ไขสคริปต์ให้เป็นแบบที่เรียกใช้ได้ แล้วเรียกใช้สคริปต์ การดำเนินการนี้จะใช้เวลาประมาณ 2 นาที
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
คุณสามารถตรวจสอบว่าเหตุการณ์เผยแพร่สําเร็จแล้วโดยเรียกใช้คําสั่งนี้ซึ่งจะพิมพ์เหตุการณ์ทั้งหมด กด <control-c>
เพื่อออก
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6 ตั้งค่าคลัสเตอร์ Dataproc
ส่วนนี้จะสร้างคลัสเตอร์ Dataproc ในเครือข่ายย่อย VPC ที่มีคลัสเตอร์ Kafka ที่มีการจัดการ ระบบจะใช้คลัสเตอร์นี้เพื่อเรียกใช้งานที่สร้างสถิติและข้อมูลเชิงลึกแบบเรียลไทม์ที่ DemoIOT Solutions ต้องการ
- สร้างคลัสเตอร์ Dataproc ในที่นี้ ชื่อคลัสเตอร์คือ
dataproc-iot
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
คุณควรได้รับการตอบกลับที่คล้ายกับข้อความต่อไปนี้
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
การสร้างคลัสเตอร์อาจใช้เวลา 10-15 นาที รอให้การดำเนินการนี้เสร็จสมบูรณ์และตรวจสอบว่าคลัสเตอร์อยู่ในสถานะ RUNNING
โดยการอธิบายคลัสเตอร์
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7 ประมวลผลข้อความ Kafka โดยใช้ Dataproc
ในส่วนสุดท้ายนี้ คุณจะส่งงาน Dataproc ที่ประมวลผลข้อความที่เผยแพร่โดยใช้ Spark Streaming งานนี้สร้างสถิติและข้อมูลเชิงลึกแบบเรียลไทม์ที่ DemoIOT Solutions ใช้ได้
- เรียกใช้คําสั่งนี้เพื่อสร้างไฟล์งาน PySpark สตรีมมิงที่ชื่อ
process_iot.py
ในเครื่อง
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
คำอธิบาย
- โค้ดนี้จะตั้งค่างาน Structured Streaming ของ PySpark เพื่ออ่านข้อมูลจากหัวข้อ Kafka ที่ระบุ โดยจะใช้ที่อยู่การบูตสแตปของเซิร์ฟเวอร์ Kafka ที่ระบุและการกำหนดค่า Kafka ที่โหลดจากไฟล์การกําหนดค่า GCS เพื่อเชื่อมต่อและตรวจสอบสิทธิ์กับโบรกเกอร์ Kafka
- โดยเริ่มจากอ่านข้อมูลดิบจาก Kafka เป็นสตรีมอาร์เรย์ไบต์ จากนั้นแคสต์อาร์เรย์ไบต์เหล่านั้นเป็นสตริง และใช้
json_schema
โดยใช้ StructType ของ Spark เพื่อระบุโครงสร้างของข้อมูล (รหัสอุปกรณ์ การประทับเวลา ตำแหน่ง ข้อมูลเซ็นเซอร์ ฯลฯ) - โดยจะพิมพ์ 10 แถวแรกไปยังคอนโซลเพื่อแสดงตัวอย่าง คำนวณอุณหภูมิเฉลี่ยต่อเซ็นเซอร์ และเขียนข้อมูลทั้งหมดลงในที่เก็บข้อมูล GCS ในรูปแบบ
avro
Avro เป็นระบบการจัดรูปแบบข้อมูลตามแถวซึ่งจัดเก็บ Structured Data ได้อย่างมีประสิทธิภาพในรูปแบบไบนารีที่กะทัดรัดซึ่งกำหนดโดยสคีมา โดยนำเสนอการพัฒนาสคีมา ความเป็นกลางทางภาษา และการบีบอัดสูงสำหรับการประมวลผลข้อมูลขนาดใหญ่
- สร้างไฟล์
client.properties
และป้อนข้อมูลตัวแปรสภาพแวดล้อมสําหรับที่อยู่บูตสตราปของเซิร์ฟเวอร์ kafka
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- อัปโหลดไฟล์
process_iot.py
และclient.properties
ไปยังที่เก็บข้อมูล Google Cloud Storage เพื่อให้งาน Dataproc ใช้งานได้
gsutil cp process_iot.py client.properties $BUCKET
- คัดลอกไฟล์ jar ของ Dependency บางรายการสำหรับงาน Dataproc ไปยังที่เก็บข้อมูล GCS ไดเรกทอรีนี้มีไฟล์ jar ที่จําเป็นสําหรับเรียกใช้งาน Spark Streaming ด้วย Kafka รวมถึงไลบรารีการตรวจสอบสิทธิ์ Kafka ที่มีการจัดการและทรัพยากร Dependency ของไลบรารีดังกล่าว ซึ่งนำมาจากตั้งค่าเครื่องไคลเอ็นต์
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- ส่งงาน Spark ไปยังคลัสเตอร์ Dataproc
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
ระบบจะพิมพ์บันทึกของไดรเวอร์ Spark นอกจากนี้ คุณควรเห็นตารางเหล่านี้บันทึกไว้ในคอนโซลและข้อมูลที่จัดเก็บไว้ในที่เก็บข้อมูล GCS ด้วย
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8 ล้างข้อมูล
ทำตามขั้นตอนเพื่อล้างข้อมูลทรัพยากรหลังจากทำ Codelab ให้เสร็จสมบูรณ์
- ลบคลัสเตอร์ Kafka ที่มีการจัดการ, VM GCE ของผู้เผยแพร่โฆษณา และคลัสเตอร์ Dataproc
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- ลบเครือข่ายย่อยและเครือข่าย VPC
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- ลบที่เก็บข้อมูล GCS หากไม่ต้องการใช้ข้อมูลอีกต่อไป
gcloud storage rm --recursive $BUCKET
9 ขอแสดงความยินดี
ยินดีด้วย คุณสร้างไปป์ไลน์การประมวลผลข้อมูล IoT ด้วย Manage Kafka และ Dataproc เรียบร้อยแล้ว ซึ่งช่วยให้ DemoIOT Solutions ได้รับข้อมูลเชิงลึกแบบเรียลไทม์เกี่ยวกับข้อมูลที่เผยแพร่โดยอุปกรณ์
คุณได้สร้างคลัสเตอร์ Kafka ที่มีการจัดการ เผยแพร่เหตุการณ์ IoT ไปยังคลัสเตอร์ และเรียกใช้งาน Dataproc ที่ใช้สตรีมมิง Spark เพื่อประมวลผลเหตุการณ์เหล่านี้แบบเรียลไทม์ ตอนนี้คุณทราบขั้นตอนสําคัญในการสร้างไปป์ไลน์ข้อมูลโดยใช้ Managed Kafka และ Dataproc แล้ว