پردازش بیدرنگ داده های اینترنت اشیا با استفاده از Dataproc و سرویس مدیریت شده گوگل برای آپاچی کافکا

پردازش بیدرنگ داده های اینترنت اشیا با استفاده از Dataproc و سرویس مدیریت شده گوگل برای آپاچی کافکا

درباره این codelab

subjectآخرین به‌روزرسانی: ژوئن ۱۶, ۲۰۲۵
account_circleنویسنده: Devanshi Khatsuriya

1. مقدمه

2efacab8643a653b.png

آخرین به روز رسانی: 2024-06-10

پس زمینه

دستگاه‌های اینترنت اشیا (IoT)، از راه‌حل‌های خانه هوشمند گرفته تا حسگرهای صنعتی، حجم وسیعی از داده‌ها را در لبه شبکه تولید می‌کنند. این داده‌ها برای موارد مختلف مانند نظارت بر دستگاه، ردیابی، تشخیص، نظارت، شخصی‌سازی، بهینه‌سازی ناوگان و موارد دیگر بسیار ارزشمند هستند. Google Managed Service برای Apache Kafka روشی مقیاس‌پذیر و بادوام برای جذب و ذخیره این جریان پیوسته از داده‌ها به روشی سازگار با OSS، استفاده آسان و ایمن ارائه می‌دهد، در حالی که Google Cloud Dataproc امکان پردازش این مجموعه داده‌های بزرگ را برای تجزیه و تحلیل داده‌ها با استفاده از Apache Spark و خوشه‌های Hadoop فراهم می‌کند.

چیزی که خواهی ساخت

در این نرم افزار کد، شما قصد دارید یک خط لوله پردازش داده های اینترنت اشیا با استفاده از سرویس مدیریت شده گوگل برای Apache Kafka، Dataproc، Python و Apache Spark بسازید که تجزیه و تحلیل بلادرنگ انجام می دهد. خط لوله شما:

  • انتشار داده‌ها از دستگاه‌های IoT در یک خوشه کافکا مدیریت شده با استفاده از ماشین‌های مجازی GCE
  • داده‌ها را از خوشه Manage Kafka به یک خوشه Dataproc منتقل کنید
  • داده‌ها را با استفاده از Dataproc Spark Streaming پردازش کنید

چیزی که یاد خواهید گرفت

  • نحوه ایجاد خوشه های Kafka و Dataproc مدیریت شده توسط گوگل
  • نحوه اجرای کارهای استریم با استفاده از Dataproc

آنچه شما نیاز دارید

2. نمای کلی

برای این کد، بیایید داستان یک شرکت ساختگی، DemoIOT Solutions را دنبال کنیم. DemoIOT Solutions دستگاه های حسگری را ارائه می دهد که داده ها را برای دما، رطوبت، فشار، سطح نور و مکان اندازه گیری و انتقال می دهد. آن‌ها می‌خواهند خطوط لوله‌ای راه‌اندازی کنند که این داده‌ها را پردازش کنند تا آمار بلادرنگ را به مشتریان خود نشان دهند. با استفاده از چنین خطوط لوله، آنها می توانند خدمات متنوعی را به مشتریان خود ارائه دهند، مانند نظارت، پیشنهادات خودکار، هشدارها و بینش در مورد مکان هایی که مشتریان حسگرهای خود را در آنجا نصب کرده اند.

برای این کار از GCE VM برای شبیه سازی دستگاه اینترنت اشیا استفاده می کنیم. دستگاه داده‌ها را در یک موضوع Kafka در خوشه Google Managed Kafka منتشر می‌کند که توسط یک کار پخش Dataproc خوانده و پردازش می‌شود. تنظیمات پیش نیاز و صفحات بعدی شما را به انجام تمام این مراحل هدایت می کند.

تنظیم پیش نیاز

  1. نام پروژه و شماره پروژه را برای پروژه خود پیدا کنید. برای مرجع به یافتن نام، شماره و شناسه پروژه مراجعه کنید.
  2. زیرشبکه VPC این امکان اتصال بین GCE VM، Cluster Kafka و Dataproc را فراهم می کند. این را دنبال کنید تا زیرشبکه های موجود را با استفاده از gcloud CLI فهرست کنید. در صورت نیاز، ایجاد یک شبکه VPC حالت خودکار را دنبال کنید که یک شبکه VPC با زیرشبکه در هر منطقه Google Cloud ایجاد می کند. اگرچه، برای هدف این نرم افزار کد، ما از یک زیرشبکه فقط از یک منطقه استفاده خواهیم کرد.
  • در این زیرشبکه، اطمینان حاصل کنید که یک قانون فایروال وجود دارد که اجازه می دهد همه ورودی ها از tcp:22، که SSH مورد نیاز است، وجود داشته باشد. این قانون در هنگام ایجاد یک شبکه در بخش قوانین فایروال برای انتخاب در دسترس خواهد بود، بنابراین مطمئن شوید که آن را انتخاب کنید.
  1. سطل GCS. برای ذخیره منابع شغلی Dataproc و تداوم داده های پردازش شده، باید به یک سطل ذخیره سازی Google Cloud دسترسی داشته باشید. اگر ندارید، می توانید در پروژه GCP خود یکی ایجاد کنید .

پر کردن متغیرهای محیطی

در ترمینال خود جایی که gcloud CLI را اجرا می‌کنید، این متغیرهای محیطی را پر کنید تا بتوان بعداً از آنها استفاده کرد.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

موارد زیر را جایگزین کنید:

  • <project-id> با نام پروژه GCP که تنظیم کردید.
  • <project-number> با نام شماره پروژه از مرحله پیش نیاز 1.
  • <region> با نام منطقه ای از مناطق و مناطق موجود که می خواهید استفاده کنید. به عنوان مثال، ما می توانیم us-central1 استفاده کنیم.
  • <zone> با نام منطقه از مناطق موجود و مناطق زیر منطقه ای که قبلاً انتخاب کرده اید. به عنوان مثال، اگر us-central1 به عنوان منطقه انتخاب کرده اید، می توانید us-central1-f به عنوان منطقه استفاده کنید. این منطقه برای ایجاد GCE VM که دستگاه‌های IoT را شبیه‌سازی می‌کند استفاده می‌شود. اطمینان حاصل کنید که منطقه شما در منطقه ای است که انتخاب کرده اید.
  • <subnet-path> با مسیر کامل زیرشبکه از مرحله پیش‌نیاز 2. مقدار این باید در قالب: projects/<project-id>/regions/<region>/subnetworks/<subnet-name> باشد.
  • <bucket-name> با نام سطل GCS از مرحله پیش نیاز 3.

3. Google Managed Kafka را راه اندازی کنید

این بخش یک خوشه Kafka مدیریت شده توسط گوگل را راه اندازی می کند که سرور کافکا را مستقر می کند و موضوعی را در این سرور ایجاد می کند که در آن داده های اینترنت اشیا پس از اشتراک در آن قابل انتشار و خواندن باشد. DemoIOT Solutions می‌تواند این خوشه را طوری تنظیم کند که همه دستگاه‌هایشان داده‌ها را در آن منتشر کنند.

یک خوشه کافکا مدیریت شده ایجاد کنید

  • خوشه مدیریت شده کافکا را ایجاد کنید. در اینجا، نام خوشه kafka-iot است.
gcloud managed-kafka clusters create kafka-iot \
   
--project=$PROJECT_ID \
   
--location=$REGION \
   
--cpu=3 \
   
--memory=12GiB \
   
--subnets=$SUBNET_PATH \
   
--auto-rebalance

شما باید پاسخی شبیه به زیر دریافت کنید:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                        
Created cluster [kafka-iot].

ایجاد خوشه 20-30 دقیقه طول می کشد. منتظر تکمیل این عملیات باشید.

یک موضوع ایجاد کنید

  • یک موضوع کافکا مدیریت شده در خوشه ایجاد کنید. در اینجا نام تاپیک kafka-iot-topic است.
gcloud managed-kafka topics create kafka-iot-topic \
   
--cluster=kafka-iot \
   
--location=$REGION \
   
--partitions=10 \
   
--replication-factor=3

شما باید خروجی مشابه زیر دریافت کنید:

Created topic [kafka-iot-topic].

4. یک Publisher راه اندازی کنید

برای انتشار در خوشه مدیریت شده کافکا، یک نمونه ماشین مجازی موتور محاسباتی Google راه اندازی کردیم که می تواند به VPC حاوی زیرشبکه استفاده شده توسط خوشه مدیریت شده کافکا دسترسی داشته باشد. این VM دستگاه های حسگر ارائه شده توسط DemoIOT Solutions را شبیه سازی می کند.

مراحل

  1. نمونه Google Compute Engine VM را ایجاد کنید. در اینجا، نام GCE VM publisher-instance است.
gcloud compute instances create publisher-instance \
   
--scopes=https://www.googleapis.com/auth/cloud-platform \
   
--subnet=$SUBNET_PATH \
   
--zone=$ZONE
  1. به حساب سرویس پیش‌فرض Google Compute Engine اجازه استفاده از سرویس مدیریت‌شده برای آپاچی کافکا را بدهید.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. برای اتصال به VM از SSH استفاده کنید. از طرف دیگر، از Google Cloud Console برای SSH استفاده کنید .
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. جاوا را برای اجرای ابزارهای خط فرمان کافکا نصب کنید و با استفاده از این دستورات کافکا باینری را دانلود کنید.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. کتابخانه احراز هویت مدیریت شده کافکا و وابستگی های آن را دانلود کنید و ویژگی های مشتری کافکا را پیکربندی کنید.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

برای جزئیات بیشتر در مورد راه اندازی دستگاه ناشر، به راه اندازی دستگاه مشتری مراجعه کنید.

5. انتشار در مدیریت کافکا

اکنون که ناشر راه‌اندازی شده است، می‌توانیم از خط فرمان کافکا روی آن برای انتشار برخی از داده‌های ساختگی از GCE VM (شبیه‌سازی دستگاه‌های اینترنت اشیا توسط DemoIOT Solutions) در خوشه Managed Kafka استفاده کنیم.

  1. از آنجایی که ما SSH را به نمونه GCE VM وارد کرده ایم، باید متغیر PROJECT_ID دوباره پر کنیم:
export PROJECT_ID=<project-id>
export REGION=<region>

موارد زیر را جایگزین کنید:

  • <project-id> با نام پروژه GCP که تنظیم کردید.
  • <region> با منطقه ای که خوشه کافکا در آن ایجاد شده است
  1. برای به دست آوردن آدرس IP سرور بوت استرپ کافکا از دستور managed-kafka clusters describe استفاده کنید. از این آدرس می توان برای اتصال به خوشه کافکا استفاده کرد.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. موضوعات موجود در خوشه را فهرست کنید:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

شما باید بتوانید خروجی زیر را مشاهده کنید که حاوی مبحث kafka-iot-topic که قبلا ایجاد کردیم.

__remote_log_metadata
kafka
-iot-topic
  1. این اسکریپت را کپی کرده و در یک فایل جدید publish_iot_data.sh قرار دهید. برای ایجاد یک فایل جدید در GCE VM، می توانید از ابزاری مانند vim یا nano استفاده کنید.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data
.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data
() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data
() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status
() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka
() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

توضیح

  • این اسکریپت پیام‌های JSON را با قرائت‌های حسگر شبیه‌سازی شده ایجاد می‌کند که دارای شناسه دستگاه، مهر زمانی، داده‌های حسگر (دما، رطوبت، فشار، نور)، اطلاعات مکان (طول جغرافیایی، طول جغرافیایی)، وضعیت دستگاه (باتری، سیگنال، نوع اتصال) و برخی فراداده‌ها هستند.
  • این یک جریان پیوسته از پیام‌ها را از تعداد مجموعه‌ای از دستگاه‌های منحصربه‌فرد تولید می‌کند، که هر کدام داده‌ها را در یک بازه زمانی مشخص ارسال می‌کنند و دستگاه‌های IoT دنیای واقعی را تقلید می‌کنند. در اینجا، ما داده‌های 10 دستگاه را منتشر می‌کنیم که هر کدام 20 قرائت را در فاصله زمانی 10 ثانیه انجام می‌دهند.
  • همچنین تمام داده های تولید شده را با استفاده از ابزار خط فرمان تولیدکننده کافکا در موضوع کافکا منتشر می کند.
  1. برخی از وابستگی های مورد استفاده توسط اسکریپت را نصب کنید - بسته bc برای محاسبات ریاضی و بسته jq برای پردازش JSON.
sudo apt-get install bc jq
  1. اسکریپت را به صورت اجرایی تغییر دهید و اسکریپت را اجرا کنید. باید حدود 2 دقیقه طول بکشد تا اجرا شود.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

با اجرای این دستور که همه رویدادها را چاپ می کند، می توانید بررسی کنید که رویدادها با موفقیت منتشر شده اند. برای خروج، <control-c> فشار دهید.

kafka-console-consumer.sh \
   
--topic kafka-iot-topic \
   
--from-beginning \
   
--bootstrap-server $BOOTSTRAP \
   
--consumer.config client.properties

6. مجموعه Dataproc را راه اندازی کنید

این بخش یک خوشه Dataproc در زیرشبکه VPC ایجاد می کند که در آن خوشه Managed Kafka وجود دارد. این خوشه برای اجرای کارهایی که آمار و بینش‌های بی‌درنگ مورد نیاز DemoIOT Solutions را تولید می‌کنند، استفاده می‌شود.

  1. یک خوشه Dataproc ایجاد کنید. در اینجا نام خوشه dataproc-iot است.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

شما باید پاسخی شبیه به زیر دریافت کنید:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

ایجاد خوشه ممکن است 10-15 دقیقه طول بکشد. منتظر تکمیل موفقیت آمیز این عملیات باشید و با توصیف خوشه بررسی کنید که خوشه در حالت RUNNING است.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. پیام های کافکا را با استفاده از Dataproc پردازش کنید

در این بخش آخر، یک کار Dataproc ارسال می کنید که پیام های منتشر شده را با استفاده از Spark Streaming پردازش می کند. این کار در واقع برخی از آمارها و بینش های بلادرنگ را ایجاد می کند که می تواند توسط DemoIOT Solutions استفاده شود.

  1. این دستور را اجرا کنید تا فایل کار جاری PySpark به نام process_iot.py به صورت محلی ایجاد شود.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
   
StructField("device_id", StringType()),
   
StructField("timestamp", StringType()),
   
StructField(
       
"location",
       
StructType([
           
StructField("latitude", FloatType()),
           
StructField("longitude", FloatType()),
       
]),
   
),
   
StructField(
       
"sensor_data",
       
StructType([
           
StructField("temperature", FloatType()),
           
StructField("humidity", FloatType()),
           
StructField("pressure", FloatType()),
           
StructField("light_level", IntegerType()),
       
]),
   
),
   
StructField(
       
"device_status",
       
StructType([
           
StructField("battery_level", IntegerType()),
           
StructField("signal_strength", IntegerType()),
           
StructField("connection_type", StringType()),
       
]),
   
),
   
StructField(
       
"metadata",
       
StructType([
           
StructField("sensor_type", StringType()),
           
StructField("unit_temperature", StringType()),
           
StructField("unit_humidity", StringType()),
           
StructField("unit_pressure", StringType()),
           
StructField("unit_light_level", StringType()),
           
StructField("firmware_version", StringType()),
       
]),
   
),
])
CLIENT_PROPERTY_KEYS = [
   
"security.protocol",
   
"sasl.mechanism",
   
"sasl.login.callback.handler.class",
   
"sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
   
# Parse client properties file
   
parsed_path = urlparse(client_properties_path)
   
if parsed_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

   
bucket_name = parsed_path.netloc
   
blob_name = parsed_path.path.lstrip("/")

   
client = storage.Client()
   
bucket = client.bucket(bucket_name)
   
blob = bucket.blob(blob_name)
   
file_content = "[DEFAULT]\n" + blob.download_as_text()

   
config = configparser.ConfigParser()
   
config.read_string(file_content)

   
client_properties = dict()
   
for key in CLIENT_PROPERTY_KEYS:
       
client_properties[key] = config.get("DEFAULT", key)

   
print(f"Client properties: {client_properties}")
   
return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
   
print("Starting initial data processing...")
   
initial_rows = (
       
spark.readStream.format("kafka")
       
.option("kafka.bootstrap.servers", bootstrap_server_address)
       
.option("startingOffsets", "earliest")
       
.option("subscribe", "kafka-iot-topic")
       
.option("kafka.security.protocol", client_properties["security.protocol"])
       
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
       
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
       
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
       
.load()
   
)

   
initial_rows = (
       
initial_rows.selectExpr("CAST(value AS STRING)")
       
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
       
.select("data.*")
   
)
   
   
# Print first 20 rows
   
def print_top_rows(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
           
batch_df.limit(20).show(truncate=False)

   
initial_query_print = initial_rows.writeStream \
       
.foreachBatch(print_top_rows) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_print)

   
# Calculate and print average temperatures
   
def process_initial_avg_temp(batch_df, batch_id):
       
if batch_df.count() > 0:
           
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
           
current_averages = (
               
batch_df.select("device_id", "sensor_data.temperature")
               
.groupBy("device_id")
               
.agg(avg("temperature").alias("average_temperature"))
               
.orderBy("device_id")
           
)
           
current_averages.show(truncate=False)

   
initial_query_avg_temp = initial_rows.writeStream \
       
.foreachBatch(process_initial_avg_temp) \
       
.trigger(once=True) \
       
.start()
   
queries_to_await.append(initial_query_avg_temp)
   
   
# Write data to GCS
   
initial_data_gcs_writer = (
       
initial_rows.writeStream.outputMode("append")
       
.format("avro")
       
.option("path", store_data_gcs_path+"/tables/iot_avro/")
       
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
       
.trigger(once=True) \
       
.start()
   
)
   
queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

   
client_properties = get_client_properties(client_properties_path)

   
# Create SparkSession
   
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
   
   
queries_to_await = []

   
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
   
   
# Wait for all queries to terminate
   
for query in queries_to_await:
       
try:
           
query.awaitTermination()
       
except Exception as e:
           
print(f"Error awaiting query: {e}")
       
finally:
           
query.stop()

   
spark.stop()

if __name__ == "__main__":
   
if len(sys.argv) < 4:
       
print("Invalid number of arguments passed ", len(sys.argv))
       
print(
           
"Usage: ",
           
sys.argv[0],
           
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
       
)
       
raise ValueError("Invalid number of arguments passed.")

   
parsed_data_path = urlparse(sys.argv[3])
   
if parsed_data_path.scheme != "gs":
       
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


   
main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

توضیح

  • این کد یک کار جریان ساخت یافته PySpark را برای خواندن داده ها از یک موضوع مشخص شده کافکا تنظیم می کند. از آدرس راه‌انداز سرور کافکا و تنظیمات کافکا بارگیری شده از یک فایل پیکربندی GCS برای اتصال و احراز هویت با کارگزار کافکا استفاده می‌کند.
  • ابتدا داده‌های خام کافکا را به‌عنوان جریانی از آرایه‌های بایتی می‌خواند و آن آرایه‌های بایت را به رشته‌ها می‌فرستد و json_schema با استفاده از StructType Spark برای تعیین ساختار داده‌ها (شناسه دستگاه، مهر زمانی، مکان، داده‌های حسگر و غیره) اعمال می‌کند.
  • 10 ردیف اول را برای پیش نمایش در کنسول چاپ می کند، میانگین دمای هر سنسور را محاسبه می کند و تمام داده ها را در سطل GCS با فرمت avro می نویسد. Avro یک سیستم سریال‌سازی داده مبتنی بر ردیف است که داده‌های ساخت‌یافته را به‌طور کارآمد در یک قالب باینری فشرده و تعریف‌شده از طریق طرحواره ذخیره می‌کند و تکامل طرحواره، بی‌طرفی زبان و فشرده‌سازی بالا را برای پردازش داده‌های در مقیاس بزرگ ارائه می‌دهد.
  1. فایل client.properties را ایجاد کنید و متغیر محیطی را برای آدرس بوت استرپ سرور کافکا پر کنید.
cat <<EOF> client.properties
security
.protocol=SASL_SSL
sasl
.mechanism=OAUTHBEARER
sasl
.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl
.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. فایل های process_iot.py و client.properties را در سطل Google Cloud Storage خود آپلود کنید تا بتوان از آنها برای کار Dataproc استفاده کرد.
gsutil cp process_iot.py client.properties $BUCKET
  1. برخی از شیشه های وابستگی را برای کار Dataproc در سطل GCS خود کپی کنید. این دایرکتوری حاوی jarهایی است که برای اجرای کارهای Spark Streaming با کافکا، و کتابخانه احراز هویت مدیریت شده کافکا و وابستگی‌های آن، از Set up a client گرفته شده است.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. کار Spark را به خوشه Dataproc ارسال کنید.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

گزارش های درایور Spark چاپ خواهند شد. همچنین باید بتوانید این جداول ثبت شده در کنسول و داده های ذخیره شده در سطل GCS خود را مشاهده کنید.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. پاک کن

مراحل پاکسازی منابع را پس از تکمیل کد لبه دنبال کنید.

  1. خوشه Managed Kafka، Publisher GCE VM و Dataproc را حذف کنید.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. زیرشبکه و شبکه VPC خود را حذف کنید.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. اگر دیگر نمی‌خواهید از داده‌ها استفاده کنید، سطل GCS خود را حذف کنید.
gcloud storage rm --recursive $BUCKET

9. تبریک میگم

تبریک می‌گوییم، شما با مدیریت کافکا و دیتاپروک یک خط لوله پردازش داده‌های اینترنت اشیا را با موفقیت ایجاد کردید که به راه‌حل‌های DemoIOT کمک می‌کند تا بینش‌های بی‌درنگ درباره داده‌های منتشر شده توسط دستگاه‌های خود کسب کنند!

شما یک خوشه مدیریت شده کافکا ایجاد کردید، رویدادهای IoT را در آن منتشر کردید و یک کار Dataproc را اجرا کردید که از جریان Spark برای پردازش این رویدادها در زمان واقعی استفاده می کرد. اکنون مراحل کلیدی مورد نیاز برای ایجاد خطوط لوله داده با استفاده از Managed Kafka و Dataproc را می دانید.

اسناد مرجع