การประมวลผลข้อมูล IoT แบบเรียลไทม์โดยใช้ Dataproc และบริการที่มีการจัดการของ Google สำหรับ Apache Kafka

1. บทนำ

2efacab8643a653b.png

อัปเดตล่าสุด: 2024-06-10

ข้อมูลเบื้องต้น

อุปกรณ์อินเทอร์เน็ตของสรรพสิ่ง (IoT) ตั้งแต่โซลูชันสมาร์ทโฮมไปจนถึงเซ็นเซอร์อุตสาหกรรมสร้างข้อมูลจำนวนมหาศาลที่ขอบของเครือข่าย ข้อมูลนี้มีประโยชน์อย่างยิ่งสำหรับกรณีการใช้งานที่หลากหลาย เช่น การตรวจสอบอุปกรณ์ การติดตาม การวินิจฉัย การเฝ้าระวัง การปรับเปลี่ยนในแบบของคุณ การเพิ่มประสิทธิภาพกลุ่ม และอื่นๆ อีกมากมาย บริการที่มีการจัดการของ Google สำหรับ Apache Kafka มีวิธีที่ปรับขนาดได้และทนทานในการนำเข้าและจัดเก็บสตรีมข้อมูลต่อเนื่องนี้ในลักษณะที่เข้ากันได้กับ OSS ใช้งานง่ายและปลอดภัย ในขณะที่ Google Cloud Dataproc ช่วยให้ประมวลผลชุดข้อมูลขนาดใหญ่นี้สำหรับการวิเคราะห์ข้อมูลโดยใช้คลัสเตอร์ Apache Spark และ Hadoop ได้

สิ่งที่คุณจะสร้าง

ใน Codelab นี้ คุณจะได้สร้างไปป์ไลน์การประมวลผลข้อมูล IoT โดยใช้บริการที่มีการจัดการของ Google สำหรับ Apache Kafka, Dataproc, Python และ Apache Spark ซึ่งทำการวิเคราะห์แบบเรียลไทม์ ไปป์ไลน์จะทำสิ่งต่อไปนี้

  • เผยแพร่ข้อมูลจากอุปกรณ์ IoT ไปยังคลัสเตอร์ Kafka ที่มีการจัดการโดยใช้ GCE VM
  • สตรีมข้อมูลจากคลัสเตอร์ Kafka ที่มีการจัดการไปยังคลัสเตอร์ Dataproc
  • ประมวลผลข้อมูลโดยใช้ชื่องาน Dataproc Spark Streaming

สิ่งที่คุณจะได้เรียนรู้

  • วิธีสร้างคลัสเตอร์ Kafka และ Dataproc ที่มีการจัดการของ Google
  • วิธีเรียกใช้งานสตรีมมิงโดยใช้ Dataproc

สิ่งที่คุณต้องมี

2. ภาพรวม

สำหรับ Codelab นี้ เราจะมาดูเรื่องราวของบริษัทสมมติ DemoIOT Solutions กัน DemoIOT Solutions มีอุปกรณ์เซ็นเซอร์ที่วัดและส่งข้อมูลอุณหภูมิ ความชื้น ความดัน ระดับแสง และตำแหน่ง และต้องการตั้งค่าไปป์ไลน์ที่ประมวลผลข้อมูลนี้เพื่อแสดงสถิติแบบเรียลไทม์ต่อลูกค้า การใช้ไปป์ไลน์ดังกล่าวช่วยให้ผู้ให้บริการสามารถให้บริการที่หลากหลายแก่ลูกค้า เช่น การตรวจสอบ คำแนะนำอัตโนมัติ การแจ้งเตือน และข้อมูลเชิงลึกเกี่ยวกับสถานที่ที่ลูกค้าติดตั้งเซ็นเซอร์

โดยเราจะใช้ GCE VM เพื่อจำลองอุปกรณ์ IoT อุปกรณ์จะเผยแพร่ข้อมูลไปยังหัวข้อ Kafka ในคลัสเตอร์ Kafka ที่มีการจัดการของ Google ซึ่งจะอ่านและประมวลผลโดยงานการสตรีม Dataproc การตั้งค่าข้อกำหนดเบื้องต้นและหน้าต่อไปนี้จะช่วยให้คุณทำตามขั้นตอนทั้งหมดนี้ได้

การตั้งค่าข้อกำหนดเบื้องต้น

  1. ค้นหาชื่อและหมายเลขโปรเจ็กต์ ดูข้อมูลอ้างอิงได้ที่ค้นหาชื่อ หมายเลข และรหัสโปรเจ็กต์
  2. เครือข่ายย่อย VPC ซึ่งจะช่วยให้เชื่อมต่อระหว่าง VM ของ GCE, คลัสเตอร์ Kafka และคลัสเตอร์ Dataproc ได้ ทำตามนี้เพื่อแสดงรายการเครือข่ายย่อยที่มีอยู่โดยใช้ gcloud CLI หากจำเป็น ให้ทำตามสร้างเครือข่าย VPC โหมดอัตโนมัติ ซึ่งจะสร้างเครือข่าย VPC ที่มีเครือข่ายย่อยในแต่ละภูมิภาคของ Google Cloud อย่างไรก็ตาม เพื่อวัตถุประสงค์ของ Codelab นี้ เราจะใช้เครือข่ายย่อยจากภูมิภาคเดียวเท่านั้น
  • ในเครือข่ายย่อยนี้ ตรวจสอบว่ามีกฎไฟร์วอลล์ที่อนุญาตให้ใช้ข้อมูลขาเข้าทั้งหมดจาก tcp:22 ซึ่งเป็น SSH ที่จำเป็น กฎนี้จะพร้อมให้เลือกในส่วนกฎไฟร์วอลล์เมื่อสร้างเครือข่าย ดังนั้นโปรดตรวจสอบว่าคุณได้เลือกกฎนี้
  1. Bucket ของ GCS คุณจะต้องมีสิทธิ์เข้าถึงที่เก็บข้อมูล Google Cloud เพื่อจัดเก็บทรัพยากรของงาน Dataproc และคงข้อมูลที่ประมวลผลแล้วไว้ หากยังไม่มี คุณก็สร้างได้ในโปรเจ็กต์ GCP

ป้อนตัวแปรสภาพแวดล้อม

ในเทอร์มินัลที่คุณเรียกใช้ gcloud CLI ให้ป้อนตัวแปรสภาพแวดล้อมเหล่านี้เพื่อให้ใช้ได้ในภายหลัง

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

แทนที่ค่าต่อไปนี้

  • <project-id> โดยใช้ชื่อโปรเจ็กต์ GCP ที่คุณตั้งค่าไว้
  • <project-number> ด้วยชื่อของหมายเลขโปรเจ็กต์จากขั้นตอนที่ 1 ของข้อกำหนดเบื้องต้น
  • <region> โดยมีชื่อภูมิภาคจากภูมิภาคและโซนที่พร้อมใช้งานที่คุณต้องการใช้ เช่น เราสามารถใช้ us-central1 ได้
  • <zone> โดยใช้ชื่อโซนจากภูมิภาคและโซนที่พร้อมให้บริการในภูมิภาคที่คุณเลือกไว้ก่อนหน้านี้ เช่น หากเลือก us-central1 เป็นภูมิภาค คุณจะใช้ us-central1-f เป็นโซนได้ ระบบจะใช้โซนนี้เพื่อสร้าง GCE VM ที่จำลองอุปกรณ์ IoT ตรวจสอบว่าโซนอยู่ในภูมิภาคที่คุณเลือก
  • <subnet-path> โดยมีเส้นทางแบบเต็มของซับเน็ตจากขั้นตอนที่ 2 ของข้อกำหนดเบื้องต้น ค่านี้ต้องอยู่ในรูปแบบ projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
  • <bucket-name> โดยใช้ชื่อของ Bucket ของ GCS จากขั้นตอนที่ 3 ของข้อกำหนดเบื้องต้น

3. ตั้งค่า Kafka ที่มีการจัดการของ Google

ส่วนนี้จะตั้งค่าคลัสเตอร์ Kafka ที่มีการจัดการของ Google ซึ่งจะทำให้ใช้งานได้เซิร์ฟเวอร์ Kafka และสร้างหัวข้อในเซิร์ฟเวอร์นี้ที่สามารถเผยแพร่และอ่านข้อมูล IoT ได้หลังจากสมัครใช้บริการ DemoIOT Solutions สามารถตั้งค่าคลัสเตอร์นี้เพื่อให้อุปกรณ์ทั้งหมดเผยแพร่ข้อมูลไปยังคลัสเตอร์

สร้างคลัสเตอร์ Kafka ที่มีการจัดการ

  • สร้างคลัสเตอร์ Kafka ที่มีการจัดการ ในที่นี้ ชื่อของคลัสเตอร์คือ kafka-iot
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

คุณควรได้รับการตอบกลับที่คล้ายกับข้อความต่อไปนี้

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

การสร้างคลัสเตอร์จะใช้เวลา 20-30 นาที รอให้การดำเนินการนี้เสร็จสมบูรณ์

สร้างหัวข้อ

  • สร้างหัวข้อ Kafka ที่มีการจัดการในคลัสเตอร์ ในที่นี้ ชื่อของหัวข้อคือ kafka-iot-topic
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

คุณควรได้รับเอาต์พุตที่คล้ายกับเอาต์พุตต่อไปนี้

Created topic [kafka-iot-topic].

4. ตั้งค่าผู้เผยแพร่โฆษณา

หากต้องการเผยแพร่ไปยังคลัสเตอร์ Kafka ที่มีการจัดการ เราจะตั้งค่าอินสแตนซ์ VM ของ Google Compute Engine ที่เข้าถึง VPC ซึ่งมีเครือข่ายย่อยที่คลัสเตอร์ Kafka ที่มีการจัดการใช้ได้ VM นี้จำลองอุปกรณ์เซ็นเซอร์ที่ DemoIOT Solutions จัดหาให้

ขั้นตอน

  1. สร้างอินสแตนซ์ VM ของ Google Compute Engine ในที่นี้ ชื่อของ GCE VM คือ publisher-instance
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. ให้สิทธิ์แก่บัญชีบริการเริ่มต้นของ Google Compute Engine ในการใช้บริการที่มีการจัดการสำหรับ Apache Kafka
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. ใช้ SSH เพื่อเชื่อมต่อกับ VM หรือใช้คอนโซล Google Cloud เพื่อ SSH
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. ติดตั้ง Java เพื่อเรียกใช้เครื่องมือบรรทัดคำสั่ง Kafka และดาวน์โหลดไบนารี Kafka โดยใช้คำสั่งต่อไปนี้
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. ดาวน์โหลดไลบรารีการตรวจสอบสิทธิ์ Kafka ที่มีการจัดการและทรัพยากร Dependency แล้วกำหนดค่าพร็อพเพอร์ตี้ของไคลเอ็นต์ Kafka
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

ดูรายละเอียดเพิ่มเติมเกี่ยวกับการตั้งค่าเครื่องของผู้เผยแพร่โฆษณาได้ที่ตั้งค่าเครื่องไคลเอ็นต์

5. เผยแพร่ไปยัง Kafka ที่มีการจัดการ

เมื่อตั้งค่าผู้เผยแพร่แล้ว เราจะใช้บรรทัดคำสั่ง Kafka ในผู้เผยแพร่เพื่อเผยแพร่ข้อมูลจำลองจาก VM ของ GCE (จำลองอุปกรณ์ IoT โดย DemoIOT Solutions) ไปยังคลัสเตอร์ Kafka ที่มีการจัดการได้

  1. เนื่องจากเรา SSH เข้าสู่อินสแตนซ์ VM ของ GCE แล้ว เราจึงต้องป้อนข้อมูลตัวแปร PROJECT_ID อีกครั้ง
export PROJECT_ID=<project-id>
export REGION=<region>

แทนที่ค่าต่อไปนี้

  • <project-id> โดยใช้ชื่อโปรเจ็กต์ GCP ที่คุณตั้งค่าไว้
  • <region> พร้อมภูมิภาคที่สร้างคลัสเตอร์ Kafka
  1. ใช้คำสั่ง managed-kafka clusters describe เพื่อรับที่อยู่ IP ของเซิร์ฟเวอร์เริ่มต้นของ Kafka คุณใช้ที่อยู่นี้เพื่อเชื่อมต่อกับคลัสเตอร์ Kafka ได้
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. แสดงรายการหัวข้อในคลัสเตอร์
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

คุณควรเห็นเอาต์พุตต่อไปนี้ซึ่งมีหัวข้อ kafka-iot-topic ที่เราสร้างไว้ก่อนหน้านี้

__remote_log_metadata
kafka-iot-topic
  1. คัดลอกสคริปต์นี้แล้ววางลงในไฟล์ใหม่ publish_iot_data.sh หากต้องการสร้างไฟล์ใหม่ใน GCE VM คุณสามารถใช้เครื่องมืออย่าง vim หรือ nano
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

คำอธิบาย

  • สคริปต์นี้สร้างข้อความ JSON ที่มีค่าเซ็นเซอร์จำลองซึ่งมีรหัสอุปกรณ์ การประทับเวลา ข้อมูลเซ็นเซอร์ (อุณหภูมิ ความชื้น ความดัน แสง) ข้อมูลตำแหน่ง (ละติจูด ลองจิจูด) สถานะอุปกรณ์ (แบตเตอรี่ สัญญาณ ประเภทการเชื่อมต่อ) และข้อมูลเมตาบางอย่าง
  • โดยจะสร้างการไหลของข้อความอย่างต่อเนื่องจากอุปกรณ์ที่ไม่ซ้ำกันจำนวนหนึ่ง ซึ่งแต่ละอุปกรณ์จะส่งข้อมูลในช่วงเวลาที่กำหนด โดยจำลองอุปกรณ์ IoT ในโลกแห่งความเป็นจริง ในที่นี้ เราเผยแพร่ข้อมูลจากอุปกรณ์ 10 เครื่องซึ่งให้ค่าอ่าน 20 ค่าต่อเครื่อง โดยมีช่วงเวลา 10 วินาที
  • นอกจากนี้ยังเผยแพร่ข้อมูลที่สร้างขึ้นทั้งหมดไปยังหัวข้อ Kafka โดยใช้เครื่องมือบรรทัดคำสั่งของ Kafka Producer
  1. ติดตั้งการขึ้นต่อกันบางอย่างที่สคริปต์ใช้ - แพ็กเกจ bc สำหรับการคำนวณทางคณิตศาสตร์ และแพ็กเกจ jq สำหรับการประมวลผล JSON
sudo apt-get install bc jq
  1. แก้ไขสคริปต์ให้เป็นไฟล์ที่เรียกใช้งานได้ แล้วเรียกใช้สคริปต์ โดยจะใช้เวลาประมาณ 2 นาที
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

คุณตรวจสอบได้ว่าเผยแพร่เหตุการณ์สําเร็จหรือไม่โดยเรียกใช้คําสั่งนี้ ซึ่งจะพิมพ์เหตุการณ์ทั้งหมด กด <control-c> เพื่อออก

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

6. ตั้งค่าคลัสเตอร์ Dataproc

ส่วนนี้จะสร้างคลัสเตอร์ Dataproc ในเครือข่ายย่อย VPC ที่มีคลัสเตอร์ Kafka ที่มีการจัดการ คลัสเตอร์นี้จะใช้เพื่อเรียกใช้งานที่สร้างสถิติและข้อมูลเชิงลึกแบบเรียลไทม์ที่ DemoIOT Solutions ต้องการ

  1. สร้างคลัสเตอร์ Dataproc ในที่นี้ชื่อของคลัสเตอร์คือ dataproc-iot
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

คุณควรได้รับการตอบกลับที่คล้ายกับข้อความต่อไปนี้

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

การสร้างคลัสเตอร์อาจใช้เวลา 10-15 นาที รอให้การดำเนินการนี้เสร็จสมบูรณ์และตรวจสอบว่าคลัสเตอร์อยู่ในสถานะ RUNNING โดยการอธิบายคลัสเตอร์

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. ประมวลผลข้อความ Kafka โดยใช้ Dataproc

ในส่วนสุดท้ายนี้ คุณจะส่งงาน Dataproc ที่ประมวลผลข้อความที่เผยแพร่โดยใช้ Spark Streaming งานนี้จะสร้างสถิติและข้อมูลเชิงลึกแบบเรียลไทม์ที่ DemoIOT Solutions สามารถนำไปใช้ได้

  1. เรียกใช้คำสั่งนี้เพื่อสร้างไฟล์งาน PySpark สำหรับการสตรีมที่ชื่อ process_iot.py ในเครื่อง
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

คำอธิบาย

  • โค้ดนี้จะตั้งค่างาน Structured Streaming ของ PySpark เพื่ออ่านข้อมูลจากหัวข้อ Kafka ที่ระบุ โดยจะใช้ที่อยู่ Bootstrap ของเซิร์ฟเวอร์ Kafka ที่ระบุและการกำหนดค่า Kafka ที่โหลดจากไฟล์การกำหนดค่า GCS เพื่อเชื่อมต่อและตรวจสอบสิทธิ์กับ Kafka Broker
  • โดยจะอ่านข้อมูลดิบจาก Kafka เป็นสตรีมของอาร์เรย์ไบต์ก่อน จากนั้นจะแปลงอาร์เรย์ไบต์เหล่านั้นเป็นสตริง และใช้ json_schema โดยใช้ StructType ของ Spark เพื่อระบุโครงสร้างของข้อมูล (รหัสอุปกรณ์, การประทับเวลา, สถานที่, ข้อมูลเซ็นเซอร์ ฯลฯ)
  • โดยจะพิมพ์ 10 แถวแรกไปยังคอนโซลเพื่อแสดงตัวอย่าง คำนวณอุณหภูมิเฉลี่ยต่อเซ็นเซอร์ และเขียนข้อมูลทั้งหมดไปยังที่เก็บข้อมูล GCS ในรูปแบบ avro Avro เป็นระบบการซีเรียลไลซ์ข้อมูลแบบแถวซึ่งจัดเก็บ Structured Data อย่างมีประสิทธิภาพในรูปแบบไบนารีที่กำหนดสคีมาแบบกะทัดรัด โดยมีวิวัฒนาการของสคีมา ความเป็นกลางของภาษา และการบีบอัดสูงสำหรับการประมวลผลข้อมูลขนาดใหญ่
  1. สร้างไฟล์ client.properties และป้อนตัวแปรสภาพแวดล้อมสำหรับที่อยู่ Bootstrap ของเซิร์ฟเวอร์ Kafka
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. อัปโหลดไฟล์ process_iot.py และ client.properties ไปยังที่เก็บข้อมูล Google Cloud Storage เพื่อให้งาน Dataproc ใช้ไฟล์ดังกล่าวได้
gsutil cp process_iot.py client.properties $BUCKET
  1. คัดลอก JAR ของการอ้างอิงบางรายการสำหรับงาน Dataproc ไปยัง Bucket GCS ไดเรกทอรีนี้มีไฟล์ JAR ที่จำเป็นต่อการเรียกใช้ Spark Streaming Jobs ด้วย Kafka รวมถึงไลบรารีการตรวจสอบสิทธิ์ Kafka ที่มีการจัดการและทรัพยากร Dependency ของไลบรารีดังกล่าว ซึ่งนำมาจากตั้งค่าเครื่องไคลเอ็นต์
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. ส่งงาน Spark ไปยังคลัสเตอร์ Dataproc
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

ระบบจะพิมพ์บันทึกของไดรเวอร์ Spark นอกจากนี้ คุณควรจะเห็นตารางเหล่านี้ที่บันทึกไว้ในคอนโซลและข้อมูลที่จัดเก็บไว้ใน Bucket ของ GCS

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. ล้างข้อมูล

ทำตามขั้นตอนเพื่อล้างข้อมูลทรัพยากรหลังจากทำ Codelab เสร็จแล้ว

  1. ลบคลัสเตอร์ Kafka ที่มีการจัดการ, VM ของ GCE ที่เป็นผู้เผยแพร่ และคลัสเตอร์ Dataproc
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. ลบเครือข่ายย่อยและเครือข่าย VPC
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. ลบบัคเก็ต GCS หากไม่ต้องการใช้ข้อมูลแล้ว
gcloud storage rm --recursive $BUCKET

9. ขอแสดงความยินดี

ขอแสดงความยินดี คุณสร้างไปป์ไลน์การประมวลผลข้อมูล IoT ด้วย Manage Kafka และ Dataproc ได้สำเร็จ ซึ่งจะช่วยให้ DemoIOT Solutions ได้รับข้อมูลเชิงลึกแบบเรียลไทม์เกี่ยวกับข้อมูลที่อุปกรณ์เผยแพร่

คุณสร้างคลัสเตอร์ Kafka ที่มีการจัดการ เผยแพร่เหตุการณ์ IoT ไปยังคลัสเตอร์ และเรียกใช้ชิ้นงาน Dataproc ที่ใช้การสตรีม Spark เพื่อประมวลผลเหตุการณ์เหล่านี้แบบเรียลไทม์ ตอนนี้คุณทราบขั้นตอนสำคัญที่จำเป็นในการสร้างไปป์ไลน์ข้อมูลโดยใช้ Kafka ที่มีการจัดการและ Dataproc แล้ว

เอกสารอ้างอิง