معالجة بيانات إنترنت الأشياء في الوقت الفعلي باستخدام Dataproc وGoogle Managed Service for Apache Kafka

معالجة بيانات إنترنت الأشياء في الوقت الفعلي باستخدام Dataproc و"خدمة Google المُدارة لـ Apache Kafka"

لمحة عن هذا الدرس التطبيقي حول الترميز

subjectتاريخ التعديل الأخير: يونيو 16, 2025
account_circleتأليف: Devanshi Khatsuriya

1. مقدمة

2efacab8643a653b.png

تاريخ آخر تعديل: 10‏-06‏-2024

الخلفية

تُنشئ أجهزة إنترنت الأشياء (IoT)، بدءًا من حلول المنازل الذكية ووصولاً إلى أجهزة الاستشعار الصناعية، كميات هائلة من البيانات عند أطراف الشبكة. هذه البيانات قيّمة لمجموعة متنوعة من حالات الاستخدام، مثل مراقبة الأجهزة وتتبُّعها وإجراء التشخيصات ومراقبة الأداء والتخصيص وتحسين أسطول المركبات وغير ذلك الكثير. توفّر "خدمة Google المُدارة لـ Apache Kafka" طريقة قابلة للتطوير وطويلة الأمد لاستيراد هذا التدفق المستمر من البيانات وتخزينه بطريقة آمنة وسهلة الاستخدام ومتوافقة مع البرامج مفتوحة المصدر، في حين تسمح Google Cloud Dataproc بمعالجة هذه المجموعات الكبيرة من البيانات لأغراض تحليل البيانات باستخدام مجموعات Apache Spark وHadoop.

التطبيق الذي ستصممه

في هذا الدليل التعليمي حول رموز البرامج، ستُنشئ مسارًا متسلسلًا لمعالجة بيانات إنترنت الأشياء باستخدام "خدمة Google المُدارة لـ Apache Kafka" وDataproc وPython وApache Spark التي تُجري تحليلات في الوقت الفعلي. سيؤدي مسار الإحالة الناجحة إلى ما يلي:

  • نشر البيانات من أجهزة إنترنت الأشياء إلى مجموعة Kafka مُدارة باستخدام أجهزة GCE الافتراضية
  • بث البيانات من مجموعة Manage Kafka إلى مجموعة Dataproc
  • معالجة البيانات باستخدام مهمة Dataproc Spark Streaming

ما ستتعرّف عليه

  • كيفية إنشاء مجموعات Google Managed Kafka وDataproc
  • كيفية تشغيل مهام البث باستخدام Dataproc

المتطلبات

2. نظرة عامة

في هذا الدرس التطبيقي حول الترميز، لنتابع قصة شركة وهمية، وهي DemoIOT Solutions. توفّر شركة DemoIOT Solutions أجهزة استشعار تقيس بيانات درجة الحرارة والرطوبة والضغط ومستوى الإضاءة والموقع الجغرافي وتنقلها. يريدون إعداد مسارات معالجة تعالج هذه البيانات لعرض إحصاءات في الوقت الفعلي للعملاء. باستخدام هذه القنوات، يمكنهم تقديم مجموعة كبيرة من الخدمات إلى عملائهم، مثل المراقبة والاقتراحات المبرمَجة والتنبيهات والإحصاءات عن الأماكن التي ثبَّت فيها العملاء أجهزة الاستشعار.

لإجراء ذلك، سنستخدم جهاز GCE الظاهري لمحاكاة جهاز إنترنت الأشياء. سينشر الجهاز البيانات في موضوع Kafka في مجموعة Kafka المُدارة من Google، والتي ستتم قراءتها ومعالجتها من خلال وظيفة البث في Dataproc. سيؤدّي إعداد المتطلّبات الأساسية والصفحات التالية إلى تنفيذ كل هذه الخطوات.

إعداد المتطلّبات الأساسية

  1. ابحث عن اسم المشروع ورقمه. راجِع العثور على اسم المشروع ورقمه ورقم تعريفه للاطّلاع على مزيد من المعلومات.
  2. شبكة VPC الفرعية سيتيح ذلك إمكانية الاتصال بين الجهاز الظاهري على GCE ومجموعة Kafka ومجموعة Dataproc. اتّبِع هذه الخطوات لعرض الشبكات الفرعية الحالية باستخدام gcloud CLI. اتّبِع خطوات إنشاء شبكة VPC في الوضع التلقائي، إذا لزم الأمر، ما سيؤدي إلى إنشاء شبكة VPC تتضمّن شبكة فرعية في كل منطقة من مناطق Google Cloud. ومع ذلك، سنستخدم شبكة فرعية من منطقة واحدة فقط لأغراض هذا الدليل التعليمي.
  • في هذه الشبكة الفرعية، تأكَّد من توفُّر قاعدة جدار حماية تسمح بجميع عمليات الدخول من tcp:22، وهو بروتوكول النقل الآمن (SSH) المطلوب. ستتوفّر هذه القاعدة للاختيار ضمن قسم "قواعد جدار الحماية" عند إنشاء شبكة، لذا تأكَّد من اختيارها.
  1. حزمة GCS ستحتاج إلى إذن الوصول إلى حزمة تخزين في Google Cloud لتخزين موارد مهام Dataproc وحفظ البيانات التي تمت معالجتها. وإذا لم يكن لديك حساب، يمكنك إنشاء حساب في مشروعك على Google Cloud Platform.

ملء متغيّرات البيئة

في المحطة الطرفية التي تُشغِّل فيها gcloud CLI، املأ متغيّرات البيئة هذه لكي يتمكّن مستخدم آخر من استخدامها لاحقًا.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

غيِّر القيم في السلسلة على الشكل التالي:

  • <project-id> باسم مشروع Google Cloud Platform الذي أعددته.
  • <project-number> باسم رقم المشروع من الخطوة 1 من المتطلّبات الأساسية.
  • <region> مع اسم منطقة من المناطق والمناطق المتاحة التي تريد استخدامها. على سبيل المثال، يمكننا استخدام us-central1.
  • <zone> مع اسم المنطقة من المناطق والمناطق المتاحة ضمن المنطقة التي اخترتها سابقًا. على سبيل المثال، إذا اخترت us-central1 كمنطقة، يمكنك استخدام us-central1-f كمنطقة. سيتم استخدام هذه المنطقة لإنشاء جهاز GCE الظاهري الذي يحاكي أجهزة إنترنت الأشياء. تأكَّد من أنّ المنطقة ضمن المنطقة التي اخترتها.
  • <subnet-path> مع المسار الكامل للشبكة الفرعية من الخطوة 2 من المتطلبات الأساسية. يجب أن تكون قيمة هذا الحقل بالتنسيق: projects/<project-id>/regions/<region>/subnetworks/<subnet-name>.
  • <bucket-name> مع اسم حزمة GCS من الخطوة 3 من المتطلّبات الأساسية.

3. إعداد Google Managed Kafka

يُعدّ هذا القسم مجموعة Kafka مُدارة من Google، والتي تنشر خادم Kafka، وتُنشئ موضوعًا على هذا الخادم يمكن من خلاله نشر بيانات إنترنت الأشياء وقراءتها بعد الاشتراك فيها. يمكن لشركة DemoIOT Solutions إعداد هذه المجموعة لكي تنشر جميع أجهزتها البيانات فيها.

إنشاء مجموعة Kafka مُدارة

  • أنشئ مجموعة Kafka مُدارة. في ما يلي اسم المجموعة kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
   
--project=$PROJECT_ID \
   
--location=$REGION \
   
--cpu=3 \
   
--memory=12GiB \
   
--subnets=$SUBNET_PATH \
   
--auto-rebalance

من المفترض أن يظهر لك ردّ مشابه لما يلي:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                        
Created cluster [kafka-iot].

يستغرق إنشاء المجموعة من 20 إلى 30 دقيقة. انتظِر حتى تكتمل هذه العملية.

إنشاء موضوع

  • أنشئ موضوع Kafka مُدارًا على المجموعة. في هذه الحالة، اسم الموضوع هو kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
   
--cluster=kafka-iot \
   
--location=$REGION \
   
--partitions=10 \
   
--replication-factor=3

من المفترض أن يظهر لك ناتج مشابه لما يلي:

Created topic [kafka-iot-topic].

4. إعداد حساب ناشر

للنشر في مجموعة Kafka المُدارة، أعددنا مثيلًا لجهاز افتراضي على Google Compute Engine يمكنه الوصول إلى شبكة VPC التي تحتوي على الشبكة الفرعية المستخدَمة من قِبل مجموعة Kafka المُدارة. تحاكي هذه الآلة الافتراضية أجهزة الاستشعار التي تقدّمها شركة DemoIOT Solutions.

الخطوات

  1. أنشئ مثيل جهاز افتراضي على Google Compute Engine. في ما يلي اسم الجهاز الافتراضي على Google Compute Engine‏: publisher-instance.
gcloud compute instances create publisher-instance \
   
--scopes=https://www.googleapis.com/auth/cloud-platform \
   
--subnet=$SUBNET_PATH \
   
--zone=$ZONE
  1. امنح حساب الخدمة التلقائي في Google Compute Engine الأذونات اللازمة لاستخدام "خدمة مُدارة لـ Apache Kafka".
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. استخدِم بروتوكول النقل الآمن (SSH) للاتصال بالجهاز الافتراضي. بدلاً من ذلك، يمكنك استخدام Google Cloud Console للاتصال عبر بروتوكول النقل الآمن (SSH).
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. ثبِّت Java لتشغيل أدوات سطر أوامر Kafka، ثم نزِّل ملف Kafka الثنائي باستخدام هذه الأوامر.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. نزِّل مكتبة مصادقة "خدمات مُدارة على Kafka" ومواردها التابعة، واضبط سمات برنامج عميل Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

لمزيد من التفاصيل حول إعداد جهاز الناشر، يُرجى الاطّلاع على مقالة إعداد جهاز عميل.

5. النشر في Managed Kafka

بعد إعداد الناشر، يمكننا استخدام سطر أوامر Kafka عليه لنشر بعض البيانات الوهمية من جهاز GCE الظاهري (لمحاكاة أجهزة إنترنت الأشياء من خلال DemoIOT Solutions) إلى مجموعة Kafka المُدارة.

  1. بما أنّنا استخدمنا بروتوكول SSH للوصول إلى مثيل GCE VM، علينا إعادة تعبئة متغيّر PROJECT_ID:
export PROJECT_ID=<project-id>
export REGION=<region>

غيِّر القيم في السلسلة على الشكل التالي:

  • <project-id> باسم مشروع Google Cloud Platform الذي أعددته.
  • <region> مع المنطقة التي تم إنشاء مجموعة Kafka فيها
  1. استخدِم الأمر managed-kafka clusters describe للحصول على عنوان IP لخادم Kafka bootstrap. يمكن استخدام هذا العنوان للربط بمجموعة Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. اعرض المواضيع في المجموعة:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

من المفترض أن يظهر لك الناتج التالي الذي يتضمّن الموضوع kafka-iot-topic الذي أنشأناه سابقًا.

__remote_log_metadata
kafka
-iot-topic
  1. انسخ النص البرمجي هذا والصقه في ملف جديد publish_iot_data.sh. لإنشاء ملف جديد على جهاز GCE الظاهري، يمكنك استخدام أداة مثل vim أو nano.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data
.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data
() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data
() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status
() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka
() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

الشرح

  • ينشئ هذا النص البرمجي رسائل JSON تتضمّن قراءات أجهزة استشعار محاكية تتضمّن معرّف الجهاز والطابع الزمني وبيانات أجهزة الاستشعار (درجة الحرارة والرطوبة والضغط والضوء) ومعلومات الموقع الجغرافي (خط العرض وخط الطول) وحالة الجهاز (البطارية والإشارة ونوع الاتصال) وبعض البيانات الوصفية.
  • ويُنشئ هذا الاختبار تدفقًا مستمرًا من الرسائل من عدد محدّد من الأجهزة الفريدة، يُرسِل كلّ منها البيانات في فاصل زمني محدّد، ما يحاكي أجهزة إنترنت الأشياء في العالم الواقعي. ننشر هنا بيانات من 10 أجهزة تُجري 20 قياسًا لكل منها، وذلك بفواصل زمنية تبلغ 10 ثوانٍ.
  • وينشر أيضًا جميع البيانات التي تم إنشاؤها في موضوع Kafka باستخدام أداة سطر الأوامر Kafka producer.
  1. ثبِّت بعض التبعيات التي يستخدمها النص البرمجي، وهي حزمة bc للعمليات الحسابية وحزمة jq لمعالجة JSON.
sudo apt-get install bc jq
  1. عدِّل النص البرمجي ليصبح قابلاً للتنفيذ وشغِّله. من المفترض أن يستغرق تنفيذ هذا الإجراء دقيقتَين تقريبًا.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

يمكنك التحقّق من نشر الأحداث بنجاح من خلال تنفيذ هذا الأمر الذي سيطبع جميع الأحداث. اضغط على <control-c> للخروج.

kafka-console-consumer.sh \
   
--topic kafka-iot-topic \
   
--from-beginning \
   
--bootstrap-server $BOOTSTRAP \
   
--consumer.config client.properties

6. إعداد مجموعة Dataproc

ينشئ هذا القسم مجموعة Dataproc في الشبكة الفرعية لشبكة VPC التي تتوفّر فيها مجموعة Kafka المُدارة. سيتم استخدام هذه المجموعة لتشغيل المهام التي تنشئ الإحصاءات والإحصاءات في الوقت الفعلي التي تحتاجها شركة DemoIOT Solutions.

  1. أنشئ مجموعة Dataproc. في ما يلي اسم المجموعة dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

من المفترض أن يظهر لك ردّ مشابه لما يلي:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

قد يستغرق إنشاء المجموعة مدة تتراوح بين 10 و15 دقيقة. انتظِر اكتمال هذه العملية بنجاح وتأكَّد من أنّ المجموعة في الحالة RUNNING من خلال وصف المجموعة.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. معالجة رسائل Kafka باستخدام Dataproc

في هذا القسم الأخير، سترسل وظيفة Dataproc تعالج الرسائل المنشورة باستخدام Spark Streaming. تُنشئ هذه الوظيفة بعض الإحصاءات والإحصاءات في الوقت الفعلي التي يمكن لشركة DemoIOT Solutions استخدامها.

  1. شغِّل هذا الأمر لإنشاء ملف مهمة PySpark للبث الذي يُسمى process_iot.py على الجهاز.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
   
StructField("device_id", StringType()),
   
StructField("timestamp", StringType()),
   
StructField(
       
"location",
       
StructType([
           
StructField("latitude", FloatType()),
           
StructField("longitude", FloatType()),
       
]),
   
),
   
StructField(
       
"sensor_data",
       
StructType([
           
StructField("temperature", FloatType()),
           
StructField("humidity", FloatType()),
           
StructField("pressure", FloatType()),
           
StructField("light_level", IntegerType()),
       
]),
   
),
   
StructField(
       
"device_status",
       
StructType([
           
StructField("battery_level", IntegerType()),
           
StructField("signal_strength", IntegerType()),
           
StructField("connection_type", StringType()),
       
]),
   
),
   
StructField(
       
"metadata",
       
StructType([
           
StructField("sensor_type", StringType()),
           
StructField("unit_temperature", StringType()),
           
StructField("unit_humidity", StringType()),
           
StructField("unit_pressure", StringType()),
           
StructField("unit_light_level", StringType()),
           
StructField("firmware_version", StringType()),
       
]),
   
),
])
CLIENT_PROPERTY_KEYS = [
   
"security.protocol",
   
"sasl.mechanism",
   
"sasl.login.callback.handler.class",
   
"sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
   
# Parse client properties file
   
parsed_path = urlparse(client_properties_path)
   
if parsed_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

   
bucket_name = parsed_path.netloc
   
blob_name = parsed_path.path.lstrip("/")

   
client = storage.Client()
   
bucket = client.bucket(bucket_name)
   
blob = bucket.blob(blob_name)
   
file_content = "[DEFAULT]\n" + blob.download_as_text()

   
config = configparser.ConfigParser()
   
config.read_string(file_content)

   
client_properties = dict()
   
for key in CLIENT_PROPERTY_KEYS:
       
client_properties[key] = config.get("DEFAULT", key)

   
print(f"Client properties: {client_properties}")
   
return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
   
print("Starting initial data processing...")
   
initial_rows = (
       
spark.readStream.format("kafka")
       
.option("kafka.bootstrap.servers", bootstrap_server_address)
       
.option("startingOffsets", "earliest")
       
.option("subscribe", "kafka-iot-topic")
       
.option("kafka.security.protocol", client_properties["security.protocol"])
       
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
       
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
       
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
       
.load()
   
)

   
initial_rows = (
       
initial_rows.selectExpr("CAST(value AS STRING)")
       
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
       
.select("data.*")
   
)
   
   
# Print first 20 rows
   
def print_top_rows(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
           
batch_df.limit(20).show(truncate=False)

   
initial_query_print = initial_rows.writeStream \
       
.foreachBatch(print_top_rows) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_print)

   
# Calculate and print average temperatures
   
def process_initial_avg_temp(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
           
current_averages = (
               
batch_df.select("device_id", "sensor_data.temperature")
               
.groupBy("device_id")
               
.agg(avg("temperature").alias("average_temperature"))
               
.orderBy("device_id")
           
)
           
current_averages.show(truncate=False)

   
initial_query_avg_temp = initial_rows.writeStream \
       
.foreachBatch(process_initial_avg_temp) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_avg_temp)
   
   
# Write data to GCS
   
initial_data_gcs_writer = (
       
initial_rows.writeStream.outputMode("append")
       
.format("avro")
       
.option("path", store_data_gcs_path+"/tables/iot_avro/")
       
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
       
.trigger(once=True) \
       
.start()
   
)
   
queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

   
client_properties = get_client_properties(client_properties_path)

   
# Create SparkSession
   
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
   
   
queries_to_await = []

   
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
   
   
# Wait for all queries to terminate
   
for query in queries_to_await:
       
try:
           
query.awaitTermination()
       
except Exception as e:
           
print(f"Error awaiting query: {e}")
       
finally:
           
query.stop()

   
spark.stop()

if __name__ == "__main__":
   
if len(sys.argv) < 4:
       
print("Invalid number of arguments passed ", len(sys.argv))
       
print(
           
"Usage: ",
           
sys.argv[0],
           
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
       
)
       
raise ValueError("Invalid number of arguments passed.")

   
parsed_data_path = urlparse(sys.argv[3])
   
if parsed_data_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


   
main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

الشرح

  • يُنشئ هذا الرمز البرمجي وظيفة PySpark Structured Streaming لقراءة البيانات من موضوع Kafka محدّد. ويستخدم عنوان بدء التشغيل لخادم Kafka وإعدادات Kafka المحمَّلة من ملف إعدادات GCS للربط بوسيط Kafka والمصادقة معه.
  • أولاً، يقرأ هذا الإجراء البيانات الأولية من Kafka كبثّ من صفائف البايتات، ويحوّل صفائف البايتات هذه إلى سلاسل، ويطبّق json_schema باستخدام StructType في Spark لتحديد بنية البيانات (رقم تعريف الجهاز والطابع الزمني والموقع الجغرافي وبيانات أداة الاستشعار وما إلى ذلك).
  • تطبع هذه الوظيفة أول 10 صفوف في وحدة التحكّم لمعاينتها، وتحسب متوسّط درجة الحرارة لكلّ أداة استشعار، وتكتب جميع البيانات في حزمة GCS بتنسيق avro. ‫Avro هو نظام تسلسل بيانات يستند إلى الصفوف، ويخزّن البيانات المنظَّمة بكفاءة بتنسيق ثنائي مكثّف يحدّده المخطّط، ما يقدّم إمكانية تطوير المخطّط والحياد اللغوي وضغطًا عاليًا لمعالجة البيانات على نطاق واسع.
  1. أنشئ ملف client.properties واملأ متغيّر البيئة لعنوان bootstrap لخادم kafka.
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. حمِّل ملفَي process_iot.py وclient.properties إلى حزمة Google Cloud Storage، حتى تتمكّن وظيفة Dataproc من استخدامهما.
gsutil cp process_iot.py client.properties $BUCKET
  1. انسخ بعض حِزم JAR للتبعيات الخاصة بوظيفتك على Dataproc إلى حزمة GCS. يحتوي هذا الدليل على حِزم جافا مطلوبة لتشغيل مهام Spark Streaming باستخدام Kafka، ومكتبة مصادقة Kafka المُدارة وتبعياتها، والتي تم الحصول عليها من إعداد جهاز عميل.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. أرسِل مهمة Spark إلى مجموعة Dataproc.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

ستتم طباعة سجلات برنامج تشغيل Spark. من المفترض أن تتمكّن أيضًا من الاطّلاع على هذه الجداول المسجّلة في وحدة التحكّم والبيانات المخزّنة في حزمة GCS.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. تَنظيم

اتّبِع الخطوات لتنظيف الموارد بعد إكمال ورشة رموز البرامج.

  1. احذف مجموعة Kafka المُدارة وجهاز Publisher GCE الافتراضي ومجموعة Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. احذف الشبكة الفرعية وشبكة VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. يمكنك حذف حزمة GCS إذا لم تعُد تريد استخدام البيانات.
gcloud storage rm --recursive $BUCKET

9. تهانينا

مبروك، لقد نجحت في إنشاء مسار بيانات لمعالجة إنترنت الأشياء باستخدام Manage Kafka وDataproc، ما يساعد شركة DemoIOT Solutions في الحصول على إحصاءات في الوقت الفعلي عن البيانات التي تنشرها أجهزتها.

لقد أنشأت مجموعة Kafka مُدارة ونشرت أحداث إنترنت الأشياء عليها ونفّذت وظيفة Dataproc التي استخدمت Spark Streaming لمعالجة هذه الأحداث في الوقت الفعلي. لقد تعرّفت الآن على الخطوات الرئيسية المطلوبة لإنشاء قنوات بيانات باستخدام Managed Kafka وDataproc.

المستندات المرجعية