การประมวลผลข้อมูล IoT แบบเรียลไทม์โดยใช้ Dataproc และบริการที่มีการจัดการของ Google สำหรับ Apache Kafka

การประมวลผลข้อมูล IoT แบบเรียลไทม์โดยใช้ Dataproc และบริการที่มีการจัดการของ Google สำหรับ Apache Kafka

เกี่ยวกับ Codelab นี้

subjectอัปเดตล่าสุดเมื่อ มิ.ย. 16, 2025
account_circleเขียนโดย Devanshi Khatsuriya

1 บทนำ

2efacab8643a653b.png

อัปเดตล่าสุด: 10-06-2024

ความเป็นมา

อุปกรณ์อินเทอร์เน็ตของสรรพสิ่ง (IoT) ตั้งแต่โซลูชันสมาร์ทโฮมไปจนถึงเซ็นเซอร์อุตสาหกรรมจะสร้างข้อมูลจำนวนมากที่ขอบเครือข่าย ข้อมูลนี้มีประโยชน์อย่างยิ่งสำหรับกรณีการใช้งานที่หลากหลาย เช่น การตรวจสอบอุปกรณ์ การติดตาม การวินิจฉัย การเฝ้าระวัง การปรับให้เหมาะกับผู้ใช้แต่ละราย การเพิ่มประสิทธิภาพของกลุ่มอุปกรณ์ และอื่นๆ อีกมากมาย บริการที่มีการจัดการของ Google สำหรับ Apache Kafka นำเสนอวิธีรับและจัดเก็บสตรีมข้อมูลที่ต่อเนื่องนี้อย่างยืดหยุ่นและยั่งยืนในลักษณะที่ใช้งานง่าย ปลอดภัย และเข้ากันได้กับ OSS ขณะที่ Google Cloud Dataproc ช่วยให้ประมวลผลชุดข้อมูลขนาดใหญ่เหล่านี้สําหรับการวิเคราะห์ข้อมูลได้โดยใช้คลัสเตอร์ Apache Spark และ Hadoop

สิ่งที่คุณจะสร้าง

ในโค้ดแล็บนี้ คุณจะได้สร้างไปป์ไลน์การประมวลผลข้อมูล IoT โดยใช้บริการที่มีการจัดการของ Google สำหรับ Apache Kafka, Dataproc, Python และ Apache Spark ซึ่งทําการวิเคราะห์แบบเรียลไทม์ ไปป์ไลน์จะทําดังนี้

  • เผยแพร่ข้อมูลจากอุปกรณ์ IoT ไปยังคลัสเตอร์ Kafka ที่มีการจัดการโดยใช้ GCE VM
  • สตรีมข้อมูลจากคลัสเตอร์ Manage Kafka ไปยังคลัสเตอร์ Dataproc
  • ประมวลผลข้อมูลโดยใช้งาน Dataproc Spark Streaming

สิ่งที่คุณจะได้เรียนรู้

  • วิธีสร้างคลัสเตอร์ Kafka และ Dataproc ที่ Google จัดการ
  • วิธีเรียกใช้งานสตรีมมิงโดยใช้ Dataproc

สิ่งที่ต้องมี

2 ภาพรวม

ในโค้ดแล็บนี้ เราจะติดตามเรื่องราวของบริษัทสมมติชื่อ DemoIOT Solutions DemoIOT Solutions มีอุปกรณ์เซ็นเซอร์ที่วัดและส่งข้อมูลอุณหภูมิ ความชื้น ความดัน ระดับแสง และตำแหน่ง ลูกค้าต้องการตั้งค่าไปป์ไลน์ที่ประมวลผลข้อมูลนี้เพื่อแสดงสถิติแบบเรียลไทม์แก่ลูกค้า การใช้ไปป์ไลน์ดังกล่าวช่วยให้บริษัทให้บริการที่หลากหลายแก่ลูกค้าได้ เช่น การตรวจสอบ คำแนะนำอัตโนมัติ การแจ้งเตือน และข้อมูลเชิงลึกเกี่ยวกับสถานที่ที่ลูกค้าติดตั้งเซ็นเซอร์

โดยเราจะใช้ GCE VM เพื่อจำลองอุปกรณ์ IoT อุปกรณ์จะเผยแพร่ข้อมูลไปยังหัวข้อ Kafka ในคลัสเตอร์ Kafka ที่ Google จัดการ ซึ่งจะอ่านและประมวลผลโดยงานสตรีมมิง Dataproc การตั้งค่าข้อกําหนดเบื้องต้นและหน้าต่อไปนี้จะนำคุณไปยังขั้นตอนเหล่านี้ทั้งหมด

การตั้งค่าเบื้องต้น

  1. ค้นหาชื่อและหมายเลขโปรเจ็กต์ โปรดดูข้อมูลอ้างอิงจากค้นหาชื่อ หมายเลข และรหัสโปรเจ็กต์
  2. ซับเน็ต VPC ซึ่งจะช่วยให้สามารถเชื่อมต่อระหว่าง GCE VM, คลัสเตอร์ Kafka และคลัสเตอร์ Dataproc ได้ ทำตามขั้นตอนนี้เพื่อแสดงรายการซับเน็ตที่มีอยู่โดยใช้ gcloud CLI หากจำเป็น ให้ทําตามหัวข้อสร้างเครือข่าย VPC โหมดอัตโนมัติ ซึ่งจะสร้างเครือข่าย VPC ที่มีเครือข่ายย่อยในแต่ละภูมิภาคของ Google Cloud แต่สําหรับวัตถุประสงค์ของโค้ดแล็บนี้ เราจะใช้เครือข่ายย่อยจากภูมิภาคเดียวเท่านั้น
  • ในซับเน็ตนี้ ให้ตรวจสอบว่ามีกฎไฟร์วอลล์ที่อนุญาตให้ใช้ข้อมูลขาเข้าทั้งหมดจาก tcp:22 ซึ่งเป็น SSH ที่จำเป็น กฎนี้จะเลือกได้ในส่วนกฎไฟร์วอลล์เมื่อสร้างเครือข่าย ดังนั้นโปรดเลือกกฎนี้
  1. ที่เก็บข้อมูล GCS คุณจะต้องเข้าถึงที่เก็บข้อมูล Google Cloud Storage เพื่อจัดเก็บทรัพยากรงาน Dataproc และเก็บข้อมูลที่ประมวลผลไว้ หากยังไม่มี คุณสามารถสร้างในโปรเจ็กต์ GCP

ป้อนข้อมูลตัวแปรสภาพแวดล้อม

ในเทอร์มินัลที่คุณเรียกใช้ gcloud CLI ให้ป้อนข้อมูลตัวแปรสภาพแวดล้อมเหล่านี้เพื่อให้ใช้ภายหลังได้

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

แทนที่ค่าต่อไปนี้

  • <project-id> ที่มีชื่อโปรเจ็กต์ GCP ที่คุณตั้งค่า
  • <project-number> ด้วยชื่อหมายเลขโปรเจ็กต์จากขั้นตอนก่อนการเริ่มต้นที่ 1
  • <region> ที่มีชื่อภูมิภาคจากภูมิภาคและโซนที่ใช้ได้ที่คุณต้องการใช้ เช่น เราสามารถใช้ us-central1
  • <zone> ที่มีชื่อโซนจากภูมิภาคและโซนที่ใช้ได้ในส่วนภูมิภาคที่คุณเลือกไว้ก่อนหน้านี้ เช่น หากเลือก us-central1 เป็นภูมิภาค คุณจะใช้ us-central1-f เป็นโซนได้ ระบบจะใช้โซนนี้เพื่อสร้าง GCE VM ที่จำลองอุปกรณ์ IoT ตรวจสอบว่าเขตของคุณอยู่ในภูมิภาคที่คุณเลือก
  • <subnet-path> พร้อมเส้นทางแบบเต็มของซับเน็ตจากขั้นตอนที่ 2 ของข้อกําหนดเบื้องต้น ค่าของส่วนนี้ต้องอยู่ในรูปแบบ projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
  • <bucket-name> ที่มีชื่อของที่เก็บข้อมูล GCS จากขั้นตอนก่อนการเริ่มต้นใช้งานที่ 3

3 ตั้งค่า Kafka ที่มีการจัดการของ Google

ส่วนนี้จะตั้งค่าคลัสเตอร์ Kafka ที่ Google จัดการ ซึ่งจะติดตั้งใช้งานเซิร์ฟเวอร์ Kafka และสร้างหัวข้อในเซิร์ฟเวอร์นี้เพื่อเผยแพร่และอ่านข้อมูล IoT หลังจากสมัครใช้บริการ DemoIOT Solutions สามารถตั้งค่าคลัสเตอร์นี้เพื่อให้อุปกรณ์ทั้งหมดเผยแพร่ข้อมูลไปยังคลัสเตอร์ได้

สร้างคลัสเตอร์ Kafka ที่มีการจัดการ

  • สร้างคลัสเตอร์ Kafka ที่มีการจัดการ ในที่นี้ ชื่อคลัสเตอร์คือ kafka-iot
gcloud managed-kafka clusters create kafka-iot \
   
--project=$PROJECT_ID \
   
--location=$REGION \
   
--cpu=3 \
   
--memory=12GiB \
   
--subnets=$SUBNET_PATH \
   
--auto-rebalance

คุณควรได้รับการตอบกลับที่คล้ายกับข้อความต่อไปนี้

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                        
Created cluster [kafka-iot].

การสร้างคลัสเตอร์จะใช้เวลา 20-30 นาที โปรดรอให้การดำเนินการนี้เสร็จสมบูรณ์

สร้างหัวข้อ

  • สร้างหัวข้อ Kafka ที่มีการจัดการในคลัสเตอร์ ในที่นี้ ชื่อหัวข้อคือ kafka-iot-topic
gcloud managed-kafka topics create kafka-iot-topic \
   
--cluster=kafka-iot \
   
--location=$REGION \
   
--partitions=10 \
   
--replication-factor=3

คุณควรเห็นเอาต์พุตที่คล้ายกับต่อไปนี้

Created topic [kafka-iot-topic].

4 ตั้งค่าผู้เผยแพร่โฆษณา

หากต้องการเผยแพร่ไปยังคลัสเตอร์ Kafka ที่มีการจัดการ เราจะตั้งค่าอินสแตนซ์ VM ของ Google Compute Engine ที่เข้าถึง VPC ซึ่งมีซับเน็ตที่คลัสเตอร์ Kafka ที่มีการจัดการใช้ VM นี้จะจำลองอุปกรณ์เซ็นเซอร์ที่ให้บริการโดย DemoIOT Solutions

ขั้นตอน

  1. สร้างอินสแตนซ์ VM ของ Google Compute Engine ในที่นี้ ชื่อของ GCE VM คือ publisher-instance
gcloud compute instances create publisher-instance \
   
--scopes=https://www.googleapis.com/auth/cloud-platform \
   
--subnet=$SUBNET_PATH \
   
--zone=$ZONE
  1. ให้สิทธิ์บัญชีบริการเริ่มต้นของ Google Compute Engine เพื่อใช้บริการที่มีการจัดการสำหรับ Apache Kafka
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. ใช้ SSH เพื่อเชื่อมต่อกับ VM หรือใช้คอนโซล Google Cloud เพื่อ SSH
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. ติดตั้ง Java เพื่อเรียกใช้เครื่องมือบรรทัดคำสั่งของ Kafka และดาวน์โหลดไบนารีของ Kafka โดยใช้คําสั่งเหล่านี้
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. ดาวน์โหลดไลบรารีการตรวจสอบสิทธิ์ Kafka ที่มีการจัดการและทรัพยากร Dependency ของไลบรารี รวมถึงกำหนดค่าพร็อพเพอร์ตี้ไคลเอ็นต์ Kafka
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

ดูรายละเอียดเพิ่มเติมเกี่ยวกับการตั้งค่าเครื่องของผู้เผยแพร่โฆษณาได้ที่ตั้งค่าเครื่องไคลเอ็นต์

5 เผยแพร่ไปยัง Kafka ที่มีการจัดการ

เมื่อตั้งค่าผู้เผยแพร่แล้ว เราสามารถใช้บรรทัดคำสั่ง Kafka ในการเผยแพร่ข้อมูลจำลองบางส่วนจาก VM ของ GCE (การจำลองอุปกรณ์ IoT โดย DemoIOT Solutions) ไปยังคลัสเตอร์ Kafka ที่มีการจัดการ

  1. เนื่องจากเราได้ SSH เข้าสู่อินสแตนซ์ VM ของ GCE แล้ว เราจึงต้องป้อนข้อมูลตัวแปร PROJECT_ID อีกครั้ง โดยทำดังนี้
export PROJECT_ID=<project-id>
export REGION=<region>

แทนที่ค่าต่อไปนี้

  • <project-id> ที่มีชื่อโปรเจ็กต์ GCP ที่คุณตั้งค่า
  • <region> ที่มีภูมิภาคที่สร้างคลัสเตอร์ Kafka
  1. ใช้คำสั่ง managed-kafka clusters describe เพื่อรับที่อยู่ IP ของเซิร์ฟเวอร์บูตสตับ Kafka คุณใช้ที่อยู่นี้เพื่อเชื่อมต่อกับคลัสเตอร์ Kafka ได้
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. แสดงรายการหัวข้อในกลุ่ม
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

คุณควรเห็นเอาต์พุตต่อไปนี้ซึ่งมีหัวข้อ kafka-iot-topic ที่เราสร้างขึ้นก่อนหน้านี้

__remote_log_metadata
kafka
-iot-topic
  1. คัดลอกและวางสคริปต์นี้ลงในไฟล์ใหม่ publish_iot_data.sh หากต้องการสร้างไฟล์ใหม่ใน GCE VM คุณสามารถใช้เครื่องมืออย่าง vim หรือ nano
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data
.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data
() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data
() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status
() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka
() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

คำอธิบาย

  • สคริปต์นี้จะสร้างข้อความ JSON ที่มีค่าที่อ่านได้จากเซ็นเซอร์จำลองซึ่งมีรหัสอุปกรณ์ การประทับเวลา ข้อมูลเซ็นเซอร์ (อุณหภูมิ ความชื้น ความดัน แสง) ข้อมูลตำแหน่ง (ละติจูด ลองจิจูด) สถานะอุปกรณ์ (แบตเตอรี่ สัญญาณ ประเภทการเชื่อมต่อ) และข้อมูลเมตาบางอย่าง
  • โดยจะสร้างกระแสข้อความอย่างต่อเนื่องจากอุปกรณ์ที่ไม่ซ้ำกันตามจำนวนที่กำหนด โดยแต่ละอุปกรณ์จะส่งข้อมูลในช่วงเวลาที่ระบุ ซึ่งจำลองอุปกรณ์ IoT ในชีวิตจริง ในส่วนนี้ เราจะเผยแพร่ข้อมูลจากอุปกรณ์ 10 เครื่องที่อ่านค่าได้ 20 ครั้งในแต่ละครั้ง โดยเว้นช่วงเวลา 10 วินาที
  • รวมถึงเผยแพร่ข้อมูลที่สร้างขึ้นทั้งหมดไปยังหัวข้อ Kafka โดยใช้เครื่องมือบรรทัดคำสั่งของ Kafka Producer
  1. ติดตั้งข้อกําหนดเบื้องต้นบางอย่างที่ใช้โดยสคริปต์ เช่น แพ็กเกจ bc สําหรับการคํานวณทางคณิตศาสตร์ และแพ็กเกจ jq สําหรับการประมวลผล JSON
sudo apt-get install bc jq
  1. แก้ไขสคริปต์ให้เป็นแบบที่เรียกใช้ได้ แล้วเรียกใช้สคริปต์ การดำเนินการนี้จะใช้เวลาประมาณ 2 นาที
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

คุณสามารถตรวจสอบว่าเหตุการณ์เผยแพร่สําเร็จแล้วโดยเรียกใช้คําสั่งนี้ซึ่งจะพิมพ์เหตุการณ์ทั้งหมด กด <control-c> เพื่อออก

kafka-console-consumer.sh \
   
--topic kafka-iot-topic \
   
--from-beginning \
   
--bootstrap-server $BOOTSTRAP \
   
--consumer.config client.properties

6 ตั้งค่าคลัสเตอร์ Dataproc

ส่วนนี้จะสร้างคลัสเตอร์ Dataproc ในเครือข่ายย่อย VPC ที่มีคลัสเตอร์ Kafka ที่มีการจัดการ ระบบจะใช้คลัสเตอร์นี้เพื่อเรียกใช้งานที่สร้างสถิติและข้อมูลเชิงลึกแบบเรียลไทม์ที่ DemoIOT Solutions ต้องการ

  1. สร้างคลัสเตอร์ Dataproc ในที่นี้ ชื่อคลัสเตอร์คือ dataproc-iot
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

คุณควรได้รับการตอบกลับที่คล้ายกับข้อความต่อไปนี้

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

การสร้างคลัสเตอร์อาจใช้เวลา 10-15 นาที รอให้การดำเนินการนี้เสร็จสมบูรณ์และตรวจสอบว่าคลัสเตอร์อยู่ในสถานะ RUNNING โดยการอธิบายคลัสเตอร์

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7 ประมวลผลข้อความ Kafka โดยใช้ Dataproc

ในส่วนสุดท้ายนี้ คุณจะส่งงาน Dataproc ที่ประมวลผลข้อความที่เผยแพร่โดยใช้ Spark Streaming งานนี้สร้างสถิติและข้อมูลเชิงลึกแบบเรียลไทม์ที่ DemoIOT Solutions ใช้ได้

  1. เรียกใช้คําสั่งนี้เพื่อสร้างไฟล์งาน PySpark สตรีมมิงที่ชื่อ process_iot.py ในเครื่อง
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
   
StructField("device_id", StringType()),
   
StructField("timestamp", StringType()),
   
StructField(
       
"location",
       
StructType([
           
StructField("latitude", FloatType()),
           
StructField("longitude", FloatType()),
       
]),
   
),
   
StructField(
       
"sensor_data",
       
StructType([
           
StructField("temperature", FloatType()),
           
StructField("humidity", FloatType()),
           
StructField("pressure", FloatType()),
           
StructField("light_level", IntegerType()),
       
]),
   
),
   
StructField(
       
"device_status",
       
StructType([
           
StructField("battery_level", IntegerType()),
           
StructField("signal_strength", IntegerType()),
           
StructField("connection_type", StringType()),
       
]),
   
),
   
StructField(
       
"metadata",
       
StructType([
           
StructField("sensor_type", StringType()),
           
StructField("unit_temperature", StringType()),
           
StructField("unit_humidity", StringType()),
           
StructField("unit_pressure", StringType()),
           
StructField("unit_light_level", StringType()),
           
StructField("firmware_version", StringType()),
       
]),
   
),
])
CLIENT_PROPERTY_KEYS = [
   
"security.protocol",
   
"sasl.mechanism",
   
"sasl.login.callback.handler.class",
   
"sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
   
# Parse client properties file
   
parsed_path = urlparse(client_properties_path)
   
if parsed_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

   
bucket_name = parsed_path.netloc
   
blob_name = parsed_path.path.lstrip("/")

   
client = storage.Client()
   
bucket = client.bucket(bucket_name)
   
blob = bucket.blob(blob_name)
   
file_content = "[DEFAULT]\n" + blob.download_as_text()

   
config = configparser.ConfigParser()
   
config.read_string(file_content)

   
client_properties = dict()
   
for key in CLIENT_PROPERTY_KEYS:
       
client_properties[key] = config.get("DEFAULT", key)

   
print(f"Client properties: {client_properties}")
   
return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
   
print("Starting initial data processing...")
   
initial_rows = (
       
spark.readStream.format("kafka")
       
.option("kafka.bootstrap.servers", bootstrap_server_address)
       
.option("startingOffsets", "earliest")
       
.option("subscribe", "kafka-iot-topic")
       
.option("kafka.security.protocol", client_properties["security.protocol"])
       
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
       
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
       
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
       
.load()
   
)

   
initial_rows = (
       
initial_rows.selectExpr("CAST(value AS STRING)")
       
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
       
.select("data.*")
   
)
   
   
# Print first 20 rows
   
def print_top_rows(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
           
batch_df.limit(20).show(truncate=False)

   
initial_query_print = initial_rows.writeStream \
       
.foreachBatch(print_top_rows) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_print)

   
# Calculate and print average temperatures
   
def process_initial_avg_temp(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
           
current_averages = (
               
batch_df.select("device_id", "sensor_data.temperature")
               
.groupBy("device_id")
               
.agg(avg("temperature").alias("average_temperature"))
               
.orderBy("device_id")
           
)
           
current_averages.show(truncate=False)

   
initial_query_avg_temp = initial_rows.writeStream \
       
.foreachBatch(process_initial_avg_temp) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_avg_temp)
   
   
# Write data to GCS
   
initial_data_gcs_writer = (
       
initial_rows.writeStream.outputMode("append")
       
.format("avro")
       
.option("path", store_data_gcs_path+"/tables/iot_avro/")
       
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
       
.trigger(once=True) \
       
.start()
   
)
   
queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

   
client_properties = get_client_properties(client_properties_path)

   
# Create SparkSession
   
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
   
   
queries_to_await = []

   
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
   
   
# Wait for all queries to terminate
   
for query in queries_to_await:
       
try:
           
query.awaitTermination()
       
except Exception as e:
           
print(f"Error awaiting query: {e}")
       
finally:
           
query.stop()

   
spark.stop()

if __name__ == "__main__":
   
if len(sys.argv) < 4:
       
print("Invalid number of arguments passed ", len(sys.argv))
       
print(
           
"Usage: ",
           
sys.argv[0],
           
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
       
)
       
raise ValueError("Invalid number of arguments passed.")

   
parsed_data_path = urlparse(sys.argv[3])
   
if parsed_data_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


   
main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

คำอธิบาย

  • โค้ดนี้จะตั้งค่างาน Structured Streaming ของ PySpark เพื่ออ่านข้อมูลจากหัวข้อ Kafka ที่ระบุ โดยจะใช้ที่อยู่การบูตสแตปของเซิร์ฟเวอร์ Kafka ที่ระบุและการกำหนดค่า Kafka ที่โหลดจากไฟล์การกําหนดค่า GCS เพื่อเชื่อมต่อและตรวจสอบสิทธิ์กับโบรกเกอร์ Kafka
  • โดยเริ่มจากอ่านข้อมูลดิบจาก Kafka เป็นสตรีมอาร์เรย์ไบต์ จากนั้นแคสต์อาร์เรย์ไบต์เหล่านั้นเป็นสตริง และใช้ json_schema โดยใช้ StructType ของ Spark เพื่อระบุโครงสร้างของข้อมูล (รหัสอุปกรณ์ การประทับเวลา ตำแหน่ง ข้อมูลเซ็นเซอร์ ฯลฯ)
  • โดยจะพิมพ์ 10 แถวแรกไปยังคอนโซลเพื่อแสดงตัวอย่าง คำนวณอุณหภูมิเฉลี่ยต่อเซ็นเซอร์ และเขียนข้อมูลทั้งหมดลงในที่เก็บข้อมูล GCS ในรูปแบบ avro Avro เป็นระบบการจัดรูปแบบข้อมูลตามแถวซึ่งจัดเก็บ Structured Data ได้อย่างมีประสิทธิภาพในรูปแบบไบนารีที่กะทัดรัดซึ่งกำหนดโดยสคีมา โดยนำเสนอการพัฒนาสคีมา ความเป็นกลางทางภาษา และการบีบอัดสูงสำหรับการประมวลผลข้อมูลขนาดใหญ่
  1. สร้างไฟล์ client.properties และป้อนข้อมูลตัวแปรสภาพแวดล้อมสําหรับที่อยู่บูตสตราปของเซิร์ฟเวอร์ kafka
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. อัปโหลดไฟล์ process_iot.py และ client.properties ไปยังที่เก็บข้อมูล Google Cloud Storage เพื่อให้งาน Dataproc ใช้งานได้
gsutil cp process_iot.py client.properties $BUCKET
  1. คัดลอกไฟล์ jar ของ Dependency บางรายการสำหรับงาน Dataproc ไปยังที่เก็บข้อมูล GCS ไดเรกทอรีนี้มีไฟล์ jar ที่จําเป็นสําหรับเรียกใช้งาน Spark Streaming ด้วย Kafka รวมถึงไลบรารีการตรวจสอบสิทธิ์ Kafka ที่มีการจัดการและทรัพยากร Dependency ของไลบรารีดังกล่าว ซึ่งนำมาจากตั้งค่าเครื่องไคลเอ็นต์
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. ส่งงาน Spark ไปยังคลัสเตอร์ Dataproc
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

ระบบจะพิมพ์บันทึกของไดรเวอร์ Spark นอกจากนี้ คุณควรเห็นตารางเหล่านี้บันทึกไว้ในคอนโซลและข้อมูลที่จัดเก็บไว้ในที่เก็บข้อมูล GCS ด้วย

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8 ล้างข้อมูล

ทำตามขั้นตอนเพื่อล้างข้อมูลทรัพยากรหลังจากทำ Codelab ให้เสร็จสมบูรณ์

  1. ลบคลัสเตอร์ Kafka ที่มีการจัดการ, VM GCE ของผู้เผยแพร่โฆษณา และคลัสเตอร์ Dataproc
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. ลบเครือข่ายย่อยและเครือข่าย VPC
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. ลบที่เก็บข้อมูล GCS หากไม่ต้องการใช้ข้อมูลอีกต่อไป
gcloud storage rm --recursive $BUCKET

9 ขอแสดงความยินดี

ยินดีด้วย คุณสร้างไปป์ไลน์การประมวลผลข้อมูล IoT ด้วย Manage Kafka และ Dataproc เรียบร้อยแล้ว ซึ่งช่วยให้ DemoIOT Solutions ได้รับข้อมูลเชิงลึกแบบเรียลไทม์เกี่ยวกับข้อมูลที่เผยแพร่โดยอุปกรณ์

คุณได้สร้างคลัสเตอร์ Kafka ที่มีการจัดการ เผยแพร่เหตุการณ์ IoT ไปยังคลัสเตอร์ และเรียกใช้งาน Dataproc ที่ใช้สตรีมมิง Spark เพื่อประมวลผลเหตุการณ์เหล่านี้แบบเรียลไทม์ ตอนนี้คุณทราบขั้นตอนสําคัญในการสร้างไปป์ไลน์ข้อมูลโดยใช้ Managed Kafka และ Dataproc แล้ว

เอกสารอ้างอิง