پردازش بیدرنگ داده های اینترنت اشیا با استفاده از Dataproc و سرویس مدیریت شده گوگل برای آپاچی کافکا

۱. مقدمه

2efacab8643a653b.png

آخرین به‌روزرسانی: 2024-06-10

پیشینه

دستگاه‌های اینترنت اشیا (IoT)، از راهکارهای خانه هوشمند گرفته تا حسگرهای صنعتی، حجم عظیمی از داده‌ها را در لبه شبکه تولید می‌کنند. این داده‌ها برای موارد استفاده متنوعی مانند نظارت بر دستگاه، ردیابی، تشخیص، نظارت، شخصی‌سازی، بهینه‌سازی ناوگان و موارد دیگر بسیار ارزشمند هستند. سرویس مدیریت‌شده گوگل برای آپاچی کافکا، روشی مقیاس‌پذیر و بادوام برای دریافت و ذخیره این جریان مداوم داده‌ها به روشی سازگار با OSS، آسان برای استفاده و ایمن ارائه می‌دهد، در حالی که Google Cloud Dataproc امکان پردازش این مجموعه داده‌های بزرگ را برای تجزیه و تحلیل داده‌ها با استفاده از خوشه‌های آپاچی اسپارک و هادوپ فراهم می‌کند.

آنچه خواهید ساخت

در این آزمایشگاه کد، شما قصد دارید یک خط لوله پردازش داده‌های اینترنت اشیا با استفاده از سرویس مدیریت‌شده گوگل برای آپاچی کافکا، دیتاپروک، پایتون و آپاچی اسپارک بسازید که تجزیه و تحلیل بلادرنگ انجام می‌دهد. خط لوله شما:

  • انتشار داده‌ها از دستگاه‌های اینترنت اشیا به یک خوشه مدیریت‌شده کافکا با استفاده از ماشین‌های مجازی GCE
  • داده‌ها را از خوشه مدیریت کافکا به خوشه Dataproc منتقل کنید
  • پردازش داده‌ها با استفاده از یک کار Dataproc Spark Streaming

آنچه یاد خواهید گرفت

  • نحوه ایجاد خوشه‌های مدیریت‌شده توسط گوگل کافکا و دیتاپروک
  • نحوه اجرای کارهای استریمینگ با استفاده از Dataproc

آنچه نیاز دارید

۲. مرور کلی

برای این کدلب، بیایید داستان یک شرکت ساختگی به نام DemoIOT Solutions را دنبال کنیم. DemoIOT Solutions دستگاه‌های حسگری ارائه می‌دهد که داده‌های مربوط به دما، رطوبت، فشار، سطح نور و مکان را اندازه‌گیری و ارسال می‌کنند. آنها می‌خواهند خطوط لوله‌ای راه‌اندازی کنند که این داده‌ها را پردازش کرده و آمار لحظه‌ای را به مشتریان خود نشان دهند. با استفاده از چنین خطوط لوله‌ای، آنها می‌توانند خدمات متنوعی را به مشتریان خود ارائه دهند، مانند نظارت، پیشنهادات خودکار، هشدارها و بینش‌هایی در مورد مکان‌هایی که مشتریان حسگرهای خود را نصب کرده‌اند.

برای انجام این کار، از یک ماشین مجازی GCE برای شبیه‌سازی دستگاه اینترنت اشیا استفاده خواهیم کرد. دستگاه، داده‌ها را در یک تاپیک کافکا در خوشه Google Managed Kafka منتشر می‌کند که توسط یک کار جریان‌سازی Dataproc خوانده و پردازش می‌شود. تنظیمات پیش‌نیاز و صفحات بعدی شما را برای انجام تمام این مراحل راهنمایی می‌کنند.

تنظیمات پیش‌نیاز

  1. نام و شماره پروژه را برای پروژه خود پیدا کنید. برای مرجع به «یافتن نام، شماره و شناسه پروژه» مراجعه کنید.
  2. زیرشبکه VPC. این امکان اتصال بین ماشین مجازی GCE، خوشه Kafka و خوشه Dataproc را فراهم می‌کند. برای فهرست کردن زیرشبکه‌های موجود با استفاده از gcloud CLI، این دستور را دنبال کنید. در صورت نیاز، از دستور ایجاد یک شبکه VPC در حالت خودکار استفاده کنید که یک شبکه VPC با زیرشبکه در هر منطقه Google Cloud ایجاد می‌کند. اگرچه، برای اهداف این آزمایشگاه کد، ما فقط از یک زیرشبکه از یک منطقه واحد استفاده خواهیم کرد.
  • در این زیرشبکه، مطمئن شوید که یک قانون فایروال وجود دارد که به همه ورودی‌ها از tcp:22 اجازه می‌دهد، که برای SSH ضروری است. این قانون هنگام ایجاد شبکه، در بخش قوانین فایروال قابل انتخاب خواهد بود، بنابراین مطمئن شوید که آن را انتخاب می‌کنید.
  1. سطل GCS. برای ذخیره منابع کار Dataproc و ذخیره داده‌های پردازش‌شده، به یک سطل ذخیره‌سازی Google Cloud نیاز دارید. اگر یکی ندارید، می‌توانید یکی در پروژه GCP خود ایجاد کنید .

متغیرهای محیطی را پر کنید

در ترمینال خود که gcloud CLI را اجرا می‌کنید، این متغیرهای محیطی را پر کنید تا بعداً بتوان از آنها استفاده کرد.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

موارد زیر را جایگزین کنید:

  • <project-id> به همراه نام پروژه GCP که راه‌اندازی کرده‌اید.
  • <project-number> با نام شماره پروژه از مرحله پیش‌نیاز ۱.
  • <region> با نام منطقه‌ای از مناطق و نواحی موجود که می‌خواهید استفاده کنید. برای مثال، می‌توانیم us-central1 استفاده کنیم.
  • <zone> با نام منطقه از مناطق موجود و مناطق تحت منطقه‌ای که قبلاً انتخاب کرده‌اید. به عنوان مثال، اگر us-central1 به عنوان منطقه انتخاب کرده‌اید، می‌توانید us-central1-f به عنوان منطقه استفاده کنید. این منطقه برای ایجاد ماشین مجازی GCE که دستگاه‌های اینترنت اشیا را شبیه‌سازی می‌کند، استفاده خواهد شد. مطمئن شوید که منطقه شما در منطقه‌ای که انتخاب کرده‌اید قرار دارد .
  • <subnet-path> با مسیر کامل زیرشبکه از مرحله پیش‌نیاز ۲. مقدار این باید به فرمت projects/<project-id>/regions/<region>/subnetworks/<subnet-name> باشد.
  • <bucket-name> با نام سطل GCS از مرحله پیش‌نیاز ۳.

۳. راه‌اندازی کافکای مدیریت‌شده توسط گوگل

این بخش یک خوشه مدیریت‌شده‌ی کافکا توسط گوگل راه‌اندازی می‌کند که سرور کافکا را مستقر می‌کند و یک تاپیک روی این سرور ایجاد می‌کند که داده‌های اینترنت اشیا پس از عضویت در آن می‌توانند منتشر و خوانده شوند. DemoIOT Solutions می‌تواند این خوشه را طوری تنظیم کند که تمام دستگاه‌هایشان داده‌ها را در آن منتشر کنند.

ایجاد یک خوشه مدیریت‌شده کافکا

  • خوشه مدیریت‌شده کافکا را ایجاد کنید. در اینجا، نام خوشه kafka-iot است.
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

شما باید پاسخی مشابه زیر دریافت کنید:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

ایجاد خوشه 20 تا 30 دقیقه طول می‌کشد. منتظر اتمام این عملیات باشید.

ایجاد یک موضوع

  • یک تاپیک مدیریت‌شده‌ی کافکا روی کلاستر ایجاد کنید. در اینجا، نام تاپیک kafka-iot-topic است.
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

شما باید خروجی مشابه زیر دریافت کنید:

Created topic [kafka-iot-topic].

۴. یک ناشر راه‌اندازی کنید

برای انتشار در خوشه مدیریت‌شده کافکا، ما یک نمونه ماشین مجازی Google Compute Engine راه‌اندازی می‌کنیم که می‌تواند به VPC حاوی زیرشبکه مورد استفاده توسط خوشه مدیریت‌شده کافکا دسترسی داشته باشد. این ماشین مجازی، دستگاه‌های حسگر ارائه شده توسط DemoIOT Solutions را شبیه‌سازی می‌کند.

مراحل

  1. نمونه ماشین مجازی Google Compute Engine را ایجاد کنید. در اینجا، نام ماشین مجازی GCE، publisher-instance است.
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. به حساب کاربری سرویس پیش‌فرض Google Compute Engine مجوزهای استفاده از سرویس مدیریت‌شده برای آپاچی کافکا را بدهید.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. برای اتصال به ماشین مجازی از SSH استفاده کنید. روش دیگر، استفاده از کنسول Google Cloud برای SSH است .
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. برای اجرای ابزارهای خط فرمان کافکا، جاوا را نصب کنید و با استفاده از این دستورات، فایل باینری کافکا را دانلود کنید.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. کتابخانه احراز هویت مدیریت‌شده کافکا و وابستگی‌های آن را دانلود کنید و ویژگی‌های کلاینت کافکا را پیکربندی کنید.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

برای جزئیات بیشتر در مورد تنظیمات دستگاه ناشر، به «تنظیم دستگاه کلاینت» مراجعه کنید.

۵. انتشار در Managed Kafka

اکنون که ناشر راه‌اندازی شده است، می‌توانیم از خط فرمان کافکا روی آن برای انتشار برخی داده‌های ساختگی از ماشین مجازی GCE (شبیه‌سازی دستگاه‌های اینترنت اشیا توسط DemoIOT Solutions) به خوشه مدیریت‌شده کافکا استفاده کنیم.

  1. از آنجایی که به نمونه ماشین مجازی GCE از طریق SSH متصل شده‌ایم، باید متغیر PROJECT_ID را دوباره پر کنیم:
export PROJECT_ID=<project-id>
export REGION=<region>

موارد زیر را جایگزین کنید:

  • <project-id> به همراه نام پروژه GCP که راه‌اندازی کرده‌اید.
  • <region> شامل منطقه‌ای که خوشه کافکا در آن ایجاد شده است
  1. از دستور managed-kafka clusters describe برای دریافت آدرس IP سرور بوت‌استرپ کافکا استفاده کنید. از این آدرس می‌توان برای اتصال به خوشه کافکا استفاده کرد.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. موضوعات موجود در خوشه را فهرست کنید:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

شما باید بتوانید خروجی زیر را ببینید که شامل تاپیک kafka-iot-topic که قبلاً ایجاد کردیم.

__remote_log_metadata
kafka-iot-topic
  1. این اسکریپت را کپی کرده و در یک فایل جدید publish_iot_data.sh قرار دهید. برای ایجاد یک فایل جدید در ماشین مجازی GCE، می‌توانید از ابزاری مانند vim یا nano استفاده کنید.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

توضیح

  • این اسکریپت پیام‌های JSON با خوانش‌های شبیه‌سازی‌شده حسگر ایجاد می‌کند که شامل شناسه دستگاه، مهر زمانی، داده‌های حسگر (دما، رطوبت، فشار، نور)، اطلاعات مکان (عرض جغرافیایی، طول جغرافیایی)، وضعیت دستگاه (باتری، سیگنال، نوع اتصال) و برخی فراداده‌ها است.
  • این سیستم، جریان پیوسته‌ای از پیام‌ها را از تعداد مشخصی دستگاه منحصر به فرد تولید می‌کند که هر کدام در یک بازه زمانی مشخص، داده‌ها را ارسال می‌کنند و دستگاه‌های اینترنت اشیا در دنیای واقعی را تقلید می‌کنند. در اینجا، ما داده‌های 10 دستگاه را منتشر می‌کنیم که هر کدام 20 خوانش را در یک بازه زمانی 10 ثانیه‌ای تولید می‌کنند.
  • همچنین تمام داده‌های تولید شده را با استفاده از ابزار خط فرمان تولیدکننده کافکا، در تاپیک کافکا منتشر می‌کند.
  1. برخی از وابستگی‌های مورد استفاده اسکریپت را نصب کنید - بسته bc برای محاسبات ریاضی و بسته jq برای پردازش JSON.
sudo apt-get install bc jq
  1. اسکریپت را به یک فایل اجرایی تغییر دهید و اجرا کنید. اجرای آن حدود ۲ دقیقه طول می‌کشد.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

با اجرای این دستور که تمام رویدادها را چاپ می‌کند، می‌توانید بررسی کنید که رویدادها با موفقیت منتشر شده‌اند. برای خروج <control-c> را فشار دهید.

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

۶. راه‌اندازی خوشه Dataproc

این بخش یک کلاستر Dataproc در زیرشبکه VPC ایجاد می‌کند که کلاستر Managed Kafka در آن قرار دارد. این کلاستر برای اجرای کارهایی که آمار و بینش‌های بلادرنگ مورد نیاز DemoIOT Solutions را تولید می‌کنند، استفاده خواهد شد.

  1. یک کلاستر Dataproc ایجاد کنید. در اینجا نام کلاستر dataproc-iot است.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

شما باید پاسخی مشابه زیر دریافت کنید:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

ایجاد خوشه ممکن است ۱۰ تا ۱۵ دقیقه طول بکشد. منتظر اتمام موفقیت‌آمیز این عملیات باشید و با توصیف خوشه، بررسی کنید که خوشه در حالت RUNNING باشد.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

۷. پردازش پیام‌های کافکا با استفاده از Dataproc

در این بخش آخر، شما یک کار Dataproc ارسال خواهید کرد که پیام‌های منتشر شده را با استفاده از Spark Streaming پردازش می‌کند. این کار در واقع برخی آمارها و بینش‌های بلادرنگ تولید می‌کند که می‌تواند توسط DemoIOT Solutions مورد استفاده قرار گیرد.

  1. این دستور را اجرا کنید تا فایل کار استریمینگ PySpark با نام process_iot.py به صورت محلی ایجاد شود.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

توضیح

  • این کد یک کار جریان‌سازی ساختاریافته‌ی PySpark را برای خواندن داده‌ها از یک موضوع مشخص‌شده‌ی کافکا تنظیم می‌کند. این کار از آدرس بوت‌استرپ سرور کافکا ارائه‌شده و پیکربندی‌های کافکا که از یک فایل پیکربندی GCS بارگذاری شده‌اند، برای اتصال و احراز هویت با کارگزار کافکا استفاده می‌کند.
  • ابتدا داده‌های خام را از کافکا به عنوان جریانی از آرایه‌های بایت می‌خواند، و آن آرایه‌های بایت را به رشته تبدیل می‌کند، و json_schema را با استفاده از StructType اسپارک برای مشخص کردن ساختار داده‌ها (شناسه دستگاه، مهر زمانی، مکان، داده‌های حسگر و غیره) اعمال می‌کند.
  • این سیستم، 10 ردیف اول را برای پیش‌نمایش در کنسول چاپ می‌کند، میانگین دما را برای هر حسگر محاسبه می‌کند و تمام داده‌ها را با فرمت avro در سطل GCS می‌نویسد. Avro یک سیستم سریال‌سازی داده مبتنی بر ردیف است که داده‌های ساختاریافته را به طور مؤثر در یک قالب دودویی فشرده و تعریف‌شده توسط طرحواره ذخیره می‌کند و تکامل طرحواره، بی‌طرفی زبان و فشرده‌سازی بالا را برای پردازش داده‌های در مقیاس بزرگ ارائه می‌دهد.
  1. فایل client.properties را ایجاد کنید و متغیر محیطی را برای آدرس bootstrap سرور kafka پر کنید.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. فایل‌های process_iot.py و client.properties را در مخزن ذخیره‌سازی ابری گوگل خود آپلود کنید تا بتوانند توسط کار Dataproc مورد استفاده قرار گیرند.
gsutil cp process_iot.py client.properties $BUCKET
  1. برخی از فایل‌های jar مربوط به وابستگی‌های مربوط به کار Dataproc را در مخزن GCS خود کپی کنید. این دایرکتوری شامل jarهایی است که برای اجرای کارهای Spark Streaming با Kafka مورد نیاز هستند، و کتابخانه احراز هویت Managed Kafka و وابستگی‌های آن، که از Setup a client machine گرفته شده است.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. کار Spark را به خوشه Dataproc ارسال کنید.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

گزارش‌های درایور اسپارک چاپ خواهند شد. همچنین باید بتوانید این جداول ثبت‌شده در کنسول و داده‌های ذخیره‌شده در سطل GCS خود را مشاهده کنید.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

۸. تمیز کردن

پس از تکمیل codelab، مراحل پاکسازی منابع را دنبال کنید.

  1. خوشه مدیریت‌شده کافکا، ماشین مجازی ناشر GCE و خوشه Dataproc را حذف کنید.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. زیرشبکه و شبکه VPC خود را حذف کنید.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. اگر دیگر نمی‌خواهید از داده‌ها استفاده کنید، سطل GCS خود را حذف کنید.
gcloud storage rm --recursive $BUCKET

۹. تبریک

تبریک می‌گویم، شما با موفقیت یک خط لوله پردازش داده‌های اینترنت اشیا با Manage Kafka و Dataproc ایجاد کرده‌اید که به DemoIOT Solutions کمک می‌کند تا بینش‌های بلادرنگ در مورد داده‌های منتشر شده توسط دستگاه‌های خود کسب کند!

شما یک کلاستر Managed Kafka ایجاد کردید، رویدادهای IoT را در آن منتشر کردید و یک کار Dataproc را اجرا کردید که از Spark streaming برای پردازش این رویدادها به صورت بلادرنگ استفاده می‌کرد. اکنون مراحل کلیدی مورد نیاز برای ایجاد خطوط لوله داده با استفاده از Managed Kafka و Dataproc را می‌دانید.

اسناد مرجع