۱. مقدمه

آخرین بهروزرسانی: 2024-06-10
پیشینه
دستگاههای اینترنت اشیا (IoT)، از راهکارهای خانه هوشمند گرفته تا حسگرهای صنعتی، حجم عظیمی از دادهها را در لبه شبکه تولید میکنند. این دادهها برای موارد استفاده متنوعی مانند نظارت بر دستگاه، ردیابی، تشخیص، نظارت، شخصیسازی، بهینهسازی ناوگان و موارد دیگر بسیار ارزشمند هستند. سرویس مدیریتشده گوگل برای آپاچی کافکا، روشی مقیاسپذیر و بادوام برای دریافت و ذخیره این جریان مداوم دادهها به روشی سازگار با OSS، آسان برای استفاده و ایمن ارائه میدهد، در حالی که Google Cloud Dataproc امکان پردازش این مجموعه دادههای بزرگ را برای تجزیه و تحلیل دادهها با استفاده از خوشههای آپاچی اسپارک و هادوپ فراهم میکند.
آنچه خواهید ساخت
در این آزمایشگاه کد، شما قصد دارید یک خط لوله پردازش دادههای اینترنت اشیا با استفاده از سرویس مدیریتشده گوگل برای آپاچی کافکا، دیتاپروک، پایتون و آپاچی اسپارک بسازید که تجزیه و تحلیل بلادرنگ انجام میدهد. خط لوله شما:
- انتشار دادهها از دستگاههای اینترنت اشیا به یک خوشه مدیریتشده کافکا با استفاده از ماشینهای مجازی GCE
- دادهها را از خوشه مدیریت کافکا به خوشه Dataproc منتقل کنید
- پردازش دادهها با استفاده از یک کار Dataproc Spark Streaming
آنچه یاد خواهید گرفت
- نحوه ایجاد خوشههای مدیریتشده توسط گوگل کافکا و دیتاپروک
- نحوه اجرای کارهای استریمینگ با استفاده از Dataproc
آنچه نیاز دارید
- یک حساب کاربری GCP فعال با پروژه تنظیم شده. اگر ندارید، میتوانید برای یک دوره آزمایشی رایگان ثبت نام کنید.
- نصب و پیکربندی رابط خط فرمان gcloud. میتوانید دستورالعملهای نصب رابط خط فرمان gcloud را روی سیستم عامل خود دنبال کنید.
- API های فعال برای Google Managed Kafka و Dataproc در پروژه GCP شما.
۲. مرور کلی
برای این کدلب، بیایید داستان یک شرکت ساختگی به نام DemoIOT Solutions را دنبال کنیم. DemoIOT Solutions دستگاههای حسگری ارائه میدهد که دادههای مربوط به دما، رطوبت، فشار، سطح نور و مکان را اندازهگیری و ارسال میکنند. آنها میخواهند خطوط لولهای راهاندازی کنند که این دادهها را پردازش کرده و آمار لحظهای را به مشتریان خود نشان دهند. با استفاده از چنین خطوط لولهای، آنها میتوانند خدمات متنوعی را به مشتریان خود ارائه دهند، مانند نظارت، پیشنهادات خودکار، هشدارها و بینشهایی در مورد مکانهایی که مشتریان حسگرهای خود را نصب کردهاند.
برای انجام این کار، از یک ماشین مجازی GCE برای شبیهسازی دستگاه اینترنت اشیا استفاده خواهیم کرد. دستگاه، دادهها را در یک تاپیک کافکا در خوشه Google Managed Kafka منتشر میکند که توسط یک کار جریانسازی Dataproc خوانده و پردازش میشود. تنظیمات پیشنیاز و صفحات بعدی شما را برای انجام تمام این مراحل راهنمایی میکنند.
تنظیمات پیشنیاز
- نام و شماره پروژه را برای پروژه خود پیدا کنید. برای مرجع به «یافتن نام، شماره و شناسه پروژه» مراجعه کنید.
- زیرشبکه VPC. این امکان اتصال بین ماشین مجازی GCE، خوشه Kafka و خوشه Dataproc را فراهم میکند. برای فهرست کردن زیرشبکههای موجود با استفاده از gcloud CLI، این دستور را دنبال کنید. در صورت نیاز، از دستور ایجاد یک شبکه VPC در حالت خودکار استفاده کنید که یک شبکه VPC با زیرشبکه در هر منطقه Google Cloud ایجاد میکند. اگرچه، برای اهداف این آزمایشگاه کد، ما فقط از یک زیرشبکه از یک منطقه واحد استفاده خواهیم کرد.
- در این زیرشبکه، مطمئن شوید که یک قانون فایروال وجود دارد که به همه ورودیها از tcp:22 اجازه میدهد، که برای SSH ضروری است. این قانون هنگام ایجاد شبکه، در بخش قوانین فایروال قابل انتخاب خواهد بود، بنابراین مطمئن شوید که آن را انتخاب میکنید.
- سطل GCS. برای ذخیره منابع کار Dataproc و ذخیره دادههای پردازششده، به یک سطل ذخیرهسازی Google Cloud نیاز دارید. اگر یکی ندارید، میتوانید یکی در پروژه GCP خود ایجاد کنید .
متغیرهای محیطی را پر کنید
در ترمینال خود که gcloud CLI را اجرا میکنید، این متغیرهای محیطی را پر کنید تا بعداً بتوان از آنها استفاده کرد.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
موارد زیر را جایگزین کنید:
-
<project-id>به همراه نام پروژه GCP که راهاندازی کردهاید. -
<project-number>با نام شماره پروژه از مرحله پیشنیاز ۱. -
<region>با نام منطقهای از مناطق و نواحی موجود که میخواهید استفاده کنید. برای مثال، میتوانیمus-central1استفاده کنیم. -
<zone>با نام منطقه از مناطق موجود و مناطق تحت منطقهای که قبلاً انتخاب کردهاید. به عنوان مثال، اگرus-central1به عنوان منطقه انتخاب کردهاید، میتوانیدus-central1-fبه عنوان منطقه استفاده کنید. این منطقه برای ایجاد ماشین مجازی GCE که دستگاههای اینترنت اشیا را شبیهسازی میکند، استفاده خواهد شد. مطمئن شوید که منطقه شما در منطقهای که انتخاب کردهاید قرار دارد . -
<subnet-path>با مسیر کامل زیرشبکه از مرحله پیشنیاز ۲. مقدار این باید به فرمتprojects/<project-id>/regions/<region>/subnetworks/<subnet-name>باشد. -
<bucket-name>با نام سطل GCS از مرحله پیشنیاز ۳.
۳. راهاندازی کافکای مدیریتشده توسط گوگل
این بخش یک خوشه مدیریتشدهی کافکا توسط گوگل راهاندازی میکند که سرور کافکا را مستقر میکند و یک تاپیک روی این سرور ایجاد میکند که دادههای اینترنت اشیا پس از عضویت در آن میتوانند منتشر و خوانده شوند. DemoIOT Solutions میتواند این خوشه را طوری تنظیم کند که تمام دستگاههایشان دادهها را در آن منتشر کنند.
ایجاد یک خوشه مدیریتشده کافکا
- خوشه مدیریتشده کافکا را ایجاد کنید. در اینجا، نام خوشه
kafka-iotاست.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
شما باید پاسخی مشابه زیر دریافت کنید:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
ایجاد خوشه 20 تا 30 دقیقه طول میکشد. منتظر اتمام این عملیات باشید.
ایجاد یک موضوع
- یک تاپیک مدیریتشدهی کافکا روی کلاستر ایجاد کنید. در اینجا، نام تاپیک
kafka-iot-topicاست.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
شما باید خروجی مشابه زیر دریافت کنید:
Created topic [kafka-iot-topic].
۴. یک ناشر راهاندازی کنید
برای انتشار در خوشه مدیریتشده کافکا، ما یک نمونه ماشین مجازی Google Compute Engine راهاندازی میکنیم که میتواند به VPC حاوی زیرشبکه مورد استفاده توسط خوشه مدیریتشده کافکا دسترسی داشته باشد. این ماشین مجازی، دستگاههای حسگر ارائه شده توسط DemoIOT Solutions را شبیهسازی میکند.
مراحل
- نمونه ماشین مجازی Google Compute Engine را ایجاد کنید. در اینجا، نام ماشین مجازی GCE،
publisher-instanceاست.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- به حساب کاربری سرویس پیشفرض Google Compute Engine مجوزهای استفاده از سرویس مدیریتشده برای آپاچی کافکا را بدهید.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- برای اتصال به ماشین مجازی از SSH استفاده کنید. روش دیگر، استفاده از کنسول Google Cloud برای SSH است .
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- برای اجرای ابزارهای خط فرمان کافکا، جاوا را نصب کنید و با استفاده از این دستورات، فایل باینری کافکا را دانلود کنید.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- کتابخانه احراز هویت مدیریتشده کافکا و وابستگیهای آن را دانلود کنید و ویژگیهای کلاینت کافکا را پیکربندی کنید.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
برای جزئیات بیشتر در مورد تنظیمات دستگاه ناشر، به «تنظیم دستگاه کلاینت» مراجعه کنید.
۵. انتشار در Managed Kafka
اکنون که ناشر راهاندازی شده است، میتوانیم از خط فرمان کافکا روی آن برای انتشار برخی دادههای ساختگی از ماشین مجازی GCE (شبیهسازی دستگاههای اینترنت اشیا توسط DemoIOT Solutions) به خوشه مدیریتشده کافکا استفاده کنیم.
- از آنجایی که به نمونه ماشین مجازی GCE از طریق SSH متصل شدهایم، باید متغیر
PROJECT_IDرا دوباره پر کنیم:
export PROJECT_ID=<project-id>
export REGION=<region>
موارد زیر را جایگزین کنید:
-
<project-id>به همراه نام پروژه GCP که راهاندازی کردهاید. -
<region>شامل منطقهای که خوشه کافکا در آن ایجاد شده است
- از دستور
managed-kafka clusters describeبرای دریافت آدرس IP سرور بوتاسترپ کافکا استفاده کنید. از این آدرس میتوان برای اتصال به خوشه کافکا استفاده کرد.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- موضوعات موجود در خوشه را فهرست کنید:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
شما باید بتوانید خروجی زیر را ببینید که شامل تاپیک kafka-iot-topic که قبلاً ایجاد کردیم.
__remote_log_metadata
kafka-iot-topic
- این اسکریپت را کپی کرده و در یک فایل جدید
publish_iot_data.shقرار دهید. برای ایجاد یک فایل جدید در ماشین مجازی GCE، میتوانید از ابزاری مانندvimیاnanoاستفاده کنید.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
توضیح
- این اسکریپت پیامهای JSON با خوانشهای شبیهسازیشده حسگر ایجاد میکند که شامل شناسه دستگاه، مهر زمانی، دادههای حسگر (دما، رطوبت، فشار، نور)، اطلاعات مکان (عرض جغرافیایی، طول جغرافیایی)، وضعیت دستگاه (باتری، سیگنال، نوع اتصال) و برخی فرادادهها است.
- این سیستم، جریان پیوستهای از پیامها را از تعداد مشخصی دستگاه منحصر به فرد تولید میکند که هر کدام در یک بازه زمانی مشخص، دادهها را ارسال میکنند و دستگاههای اینترنت اشیا در دنیای واقعی را تقلید میکنند. در اینجا، ما دادههای 10 دستگاه را منتشر میکنیم که هر کدام 20 خوانش را در یک بازه زمانی 10 ثانیهای تولید میکنند.
- همچنین تمام دادههای تولید شده را با استفاده از ابزار خط فرمان تولیدکننده کافکا، در تاپیک کافکا منتشر میکند.
- برخی از وابستگیهای مورد استفاده اسکریپت را نصب کنید - بسته
bcبرای محاسبات ریاضی و بستهjqبرای پردازش JSON.
sudo apt-get install bc jq
- اسکریپت را به یک فایل اجرایی تغییر دهید و اجرا کنید. اجرای آن حدود ۲ دقیقه طول میکشد.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
با اجرای این دستور که تمام رویدادها را چاپ میکند، میتوانید بررسی کنید که رویدادها با موفقیت منتشر شدهاند. برای خروج <control-c> را فشار دهید.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
۶. راهاندازی خوشه Dataproc
این بخش یک کلاستر Dataproc در زیرشبکه VPC ایجاد میکند که کلاستر Managed Kafka در آن قرار دارد. این کلاستر برای اجرای کارهایی که آمار و بینشهای بلادرنگ مورد نیاز DemoIOT Solutions را تولید میکنند، استفاده خواهد شد.
- یک کلاستر Dataproc ایجاد کنید. در اینجا نام کلاستر
dataproc-iotاست.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
شما باید پاسخی مشابه زیر دریافت کنید:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
ایجاد خوشه ممکن است ۱۰ تا ۱۵ دقیقه طول بکشد. منتظر اتمام موفقیتآمیز این عملیات باشید و با توصیف خوشه، بررسی کنید که خوشه در حالت RUNNING باشد.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
۷. پردازش پیامهای کافکا با استفاده از Dataproc
در این بخش آخر، شما یک کار Dataproc ارسال خواهید کرد که پیامهای منتشر شده را با استفاده از Spark Streaming پردازش میکند. این کار در واقع برخی آمارها و بینشهای بلادرنگ تولید میکند که میتواند توسط DemoIOT Solutions مورد استفاده قرار گیرد.
- این دستور را اجرا کنید تا فایل کار استریمینگ PySpark با نام
process_iot.pyبه صورت محلی ایجاد شود.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
توضیح
- این کد یک کار جریانسازی ساختاریافتهی PySpark را برای خواندن دادهها از یک موضوع مشخصشدهی کافکا تنظیم میکند. این کار از آدرس بوتاسترپ سرور کافکا ارائهشده و پیکربندیهای کافکا که از یک فایل پیکربندی GCS بارگذاری شدهاند، برای اتصال و احراز هویت با کارگزار کافکا استفاده میکند.
- ابتدا دادههای خام را از کافکا به عنوان جریانی از آرایههای بایت میخواند، و آن آرایههای بایت را به رشته تبدیل میکند، و
json_schemaرا با استفاده از StructType اسپارک برای مشخص کردن ساختار دادهها (شناسه دستگاه، مهر زمانی، مکان، دادههای حسگر و غیره) اعمال میکند. - این سیستم، 10 ردیف اول را برای پیشنمایش در کنسول چاپ میکند، میانگین دما را برای هر حسگر محاسبه میکند و تمام دادهها را با فرمت
avroدر سطل GCS مینویسد. Avro یک سیستم سریالسازی داده مبتنی بر ردیف است که دادههای ساختاریافته را به طور مؤثر در یک قالب دودویی فشرده و تعریفشده توسط طرحواره ذخیره میکند و تکامل طرحواره، بیطرفی زبان و فشردهسازی بالا را برای پردازش دادههای در مقیاس بزرگ ارائه میدهد.
- فایل
client.propertiesرا ایجاد کنید و متغیر محیطی را برای آدرس bootstrap سرور kafka پر کنید.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- فایلهای
process_iot.pyوclient.propertiesرا در مخزن ذخیرهسازی ابری گوگل خود آپلود کنید تا بتوانند توسط کار Dataproc مورد استفاده قرار گیرند.
gsutil cp process_iot.py client.properties $BUCKET
- برخی از فایلهای jar مربوط به وابستگیهای مربوط به کار Dataproc را در مخزن GCS خود کپی کنید. این دایرکتوری شامل jarهایی است که برای اجرای کارهای Spark Streaming با Kafka مورد نیاز هستند، و کتابخانه احراز هویت Managed Kafka و وابستگیهای آن، که از Setup a client machine گرفته شده است.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- کار Spark را به خوشه Dataproc ارسال کنید.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
گزارشهای درایور اسپارک چاپ خواهند شد. همچنین باید بتوانید این جداول ثبتشده در کنسول و دادههای ذخیرهشده در سطل GCS خود را مشاهده کنید.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
۸. تمیز کردن
پس از تکمیل codelab، مراحل پاکسازی منابع را دنبال کنید.
- خوشه مدیریتشده کافکا، ماشین مجازی ناشر GCE و خوشه Dataproc را حذف کنید.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- زیرشبکه و شبکه VPC خود را حذف کنید.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- اگر دیگر نمیخواهید از دادهها استفاده کنید، سطل GCS خود را حذف کنید.
gcloud storage rm --recursive $BUCKET
۹. تبریک
تبریک میگویم، شما با موفقیت یک خط لوله پردازش دادههای اینترنت اشیا با Manage Kafka و Dataproc ایجاد کردهاید که به DemoIOT Solutions کمک میکند تا بینشهای بلادرنگ در مورد دادههای منتشر شده توسط دستگاههای خود کسب کند!
شما یک کلاستر Managed Kafka ایجاد کردید، رویدادهای IoT را در آن منتشر کردید و یک کار Dataproc را اجرا کردید که از Spark streaming برای پردازش این رویدادها به صورت بلادرنگ استفاده میکرد. اکنون مراحل کلیدی مورد نیاز برای ایجاد خطوط لوله داده با استفاده از Managed Kafka و Dataproc را میدانید.