پردازش بیدرنگ داده های اینترنت اشیا با استفاده از Dataproc و سرویس مدیریت شده گوگل برای آپاچی کافکا
درباره این codelab
1. مقدمه
آخرین به روز رسانی: 2024-06-10
پس زمینه
دستگاههای اینترنت اشیا (IoT)، از راهحلهای خانه هوشمند گرفته تا حسگرهای صنعتی، حجم وسیعی از دادهها را در لبه شبکه تولید میکنند. این دادهها برای موارد مختلف مانند نظارت بر دستگاه، ردیابی، تشخیص، نظارت، شخصیسازی، بهینهسازی ناوگان و موارد دیگر بسیار ارزشمند هستند. Google Managed Service برای Apache Kafka روشی مقیاسپذیر و بادوام برای جذب و ذخیره این جریان پیوسته از دادهها به روشی سازگار با OSS، استفاده آسان و ایمن ارائه میدهد، در حالی که Google Cloud Dataproc امکان پردازش این مجموعه دادههای بزرگ را برای تجزیه و تحلیل دادهها با استفاده از Apache Spark و خوشههای Hadoop فراهم میکند.
چیزی که خواهی ساخت
در این نرم افزار کد، شما قصد دارید یک خط لوله پردازش داده های اینترنت اشیا با استفاده از سرویس مدیریت شده گوگل برای Apache Kafka، Dataproc، Python و Apache Spark بسازید که تجزیه و تحلیل بلادرنگ انجام می دهد. خط لوله شما:
- انتشار دادهها از دستگاههای IoT در یک خوشه کافکا مدیریت شده با استفاده از ماشینهای مجازی GCE
- دادهها را از خوشه Manage Kafka به یک خوشه Dataproc منتقل کنید
- دادهها را با استفاده از Dataproc Spark Streaming پردازش کنید
چیزی که یاد خواهید گرفت
- نحوه ایجاد خوشه های Kafka و Dataproc مدیریت شده توسط گوگل
- نحوه اجرای کارهای استریم با استفاده از Dataproc
آنچه شما نیاز دارید
- یک حساب GCP فعال با راهاندازی پروژه. اگر ندارید، می توانید برای یک دوره آزمایشی رایگان ثبت نام کنید.
- gcloud CLI نصب و پیکربندی شد. می توانید دستورالعمل های نصب gcloud CLI را در سیستم عامل خود دنبال کنید.
- API های فعال برای Google Managed Kafka و Dataproc در پروژه GCP شما.
2. نمای کلی
برای این کد، بیایید داستان یک شرکت ساختگی، DemoIOT Solutions را دنبال کنیم. DemoIOT Solutions دستگاه های حسگری را ارائه می دهد که داده ها را برای دما، رطوبت، فشار، سطح نور و مکان اندازه گیری و انتقال می دهد. آنها میخواهند خطوط لولهای راهاندازی کنند که این دادهها را پردازش کنند تا آمار بلادرنگ را به مشتریان خود نشان دهند. با استفاده از چنین خطوط لوله، آنها می توانند خدمات متنوعی را به مشتریان خود ارائه دهند، مانند نظارت، پیشنهادات خودکار، هشدارها و بینش در مورد مکان هایی که مشتریان حسگرهای خود را در آنجا نصب کرده اند.
برای این کار از GCE VM برای شبیه سازی دستگاه اینترنت اشیا استفاده می کنیم. دستگاه دادهها را در یک موضوع Kafka در خوشه Google Managed Kafka منتشر میکند که توسط یک کار پخش Dataproc خوانده و پردازش میشود. تنظیمات پیش نیاز و صفحات بعدی شما را به انجام تمام این مراحل هدایت می کند.
تنظیم پیش نیاز
- نام پروژه و شماره پروژه را برای پروژه خود پیدا کنید. برای مرجع به یافتن نام، شماره و شناسه پروژه مراجعه کنید.
- زیرشبکه VPC این امکان اتصال بین GCE VM، Cluster Kafka و Dataproc را فراهم می کند. این را دنبال کنید تا زیرشبکه های موجود را با استفاده از gcloud CLI فهرست کنید. در صورت نیاز، ایجاد یک شبکه VPC حالت خودکار را دنبال کنید که یک شبکه VPC با زیرشبکه در هر منطقه Google Cloud ایجاد می کند. اگرچه، برای هدف این نرم افزار کد، ما از یک زیرشبکه فقط از یک منطقه استفاده خواهیم کرد.
- در این زیرشبکه، اطمینان حاصل کنید که یک قانون فایروال وجود دارد که اجازه می دهد همه ورودی ها از tcp:22، که SSH مورد نیاز است، وجود داشته باشد. این قانون در هنگام ایجاد یک شبکه در بخش قوانین فایروال برای انتخاب در دسترس خواهد بود، بنابراین مطمئن شوید که آن را انتخاب کنید.
- سطل GCS. برای ذخیره منابع شغلی Dataproc و تداوم داده های پردازش شده، باید به یک سطل ذخیره سازی Google Cloud دسترسی داشته باشید. اگر ندارید، می توانید در پروژه GCP خود یکی ایجاد کنید .
پر کردن متغیرهای محیطی
در ترمینال خود جایی که gcloud CLI را اجرا میکنید، این متغیرهای محیطی را پر کنید تا بتوان بعداً از آنها استفاده کرد.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
موارد زیر را جایگزین کنید:
-
<project-id>
با نام پروژه GCP که تنظیم کردید. -
<project-number>
با نام شماره پروژه از مرحله پیش نیاز 1. -
<region>
با نام منطقه ای از مناطق و مناطق موجود که می خواهید استفاده کنید. به عنوان مثال، ما می توانیمus-central1
استفاده کنیم. -
<zone>
با نام منطقه از مناطق موجود و مناطق زیر منطقه ای که قبلاً انتخاب کرده اید. به عنوان مثال، اگرus-central1
به عنوان منطقه انتخاب کرده اید، می توانیدus-central1-f
به عنوان منطقه استفاده کنید. این منطقه برای ایجاد GCE VM که دستگاههای IoT را شبیهسازی میکند استفاده میشود. اطمینان حاصل کنید که منطقه شما در منطقه ای است که انتخاب کرده اید. -
<subnet-path>
با مسیر کامل زیرشبکه از مرحله پیشنیاز 2. مقدار این باید در قالب:projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
باشد. -
<bucket-name>
با نام سطل GCS از مرحله پیش نیاز 3.
3. Google Managed Kafka را راه اندازی کنید
این بخش یک خوشه Kafka مدیریت شده توسط گوگل را راه اندازی می کند که سرور کافکا را مستقر می کند و موضوعی را در این سرور ایجاد می کند که در آن داده های اینترنت اشیا پس از اشتراک در آن قابل انتشار و خواندن باشد. DemoIOT Solutions میتواند این خوشه را طوری تنظیم کند که همه دستگاههایشان دادهها را در آن منتشر کنند.
یک خوشه کافکا مدیریت شده ایجاد کنید
- خوشه مدیریت شده کافکا را ایجاد کنید. در اینجا، نام خوشه
kafka-iot
است.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
شما باید پاسخی شبیه به زیر دریافت کنید:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
ایجاد خوشه 20-30 دقیقه طول می کشد. منتظر تکمیل این عملیات باشید.
یک موضوع ایجاد کنید
- یک موضوع کافکا مدیریت شده در خوشه ایجاد کنید. در اینجا نام تاپیک
kafka-iot-topic
است.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
شما باید خروجی مشابه زیر دریافت کنید:
Created topic [kafka-iot-topic].
4. یک Publisher راه اندازی کنید
برای انتشار در خوشه مدیریت شده کافکا، یک نمونه ماشین مجازی موتور محاسباتی Google راه اندازی کردیم که می تواند به VPC حاوی زیرشبکه استفاده شده توسط خوشه مدیریت شده کافکا دسترسی داشته باشد. این VM دستگاه های حسگر ارائه شده توسط DemoIOT Solutions را شبیه سازی می کند.
مراحل
- نمونه Google Compute Engine VM را ایجاد کنید. در اینجا، نام GCE VM
publisher-instance
است.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- به حساب سرویس پیشفرض Google Compute Engine اجازه استفاده از سرویس مدیریتشده برای آپاچی کافکا را بدهید.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- برای اتصال به VM از SSH استفاده کنید. از طرف دیگر، از Google Cloud Console برای SSH استفاده کنید .
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- جاوا را برای اجرای ابزارهای خط فرمان کافکا نصب کنید و با استفاده از این دستورات کافکا باینری را دانلود کنید.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- کتابخانه احراز هویت مدیریت شده کافکا و وابستگی های آن را دانلود کنید و ویژگی های مشتری کافکا را پیکربندی کنید.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
برای جزئیات بیشتر در مورد راه اندازی دستگاه ناشر، به راه اندازی دستگاه مشتری مراجعه کنید.
5. انتشار در مدیریت کافکا
اکنون که ناشر راهاندازی شده است، میتوانیم از خط فرمان کافکا روی آن برای انتشار برخی از دادههای ساختگی از GCE VM (شبیهسازی دستگاههای اینترنت اشیا توسط DemoIOT Solutions) در خوشه Managed Kafka استفاده کنیم.
- از آنجایی که ما SSH را به نمونه GCE VM وارد کرده ایم، باید متغیر
PROJECT_ID
دوباره پر کنیم:
export PROJECT_ID=<project-id>
export REGION=<region>
موارد زیر را جایگزین کنید:
-
<project-id>
با نام پروژه GCP که تنظیم کردید. -
<region>
با منطقه ای که خوشه کافکا در آن ایجاد شده است
- برای به دست آوردن آدرس IP سرور بوت استرپ کافکا از دستور
managed-kafka clusters describe
استفاده کنید. از این آدرس می توان برای اتصال به خوشه کافکا استفاده کرد.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- موضوعات موجود در خوشه را فهرست کنید:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
شما باید بتوانید خروجی زیر را مشاهده کنید که حاوی مبحث kafka-iot-topic
که قبلا ایجاد کردیم.
__remote_log_metadata
kafka-iot-topic
- این اسکریپت را کپی کرده و در یک فایل جدید
publish_iot_data.sh
قرار دهید. برای ایجاد یک فایل جدید در GCE VM، می توانید از ابزاری مانندvim
یاnano
استفاده کنید.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
توضیح
- این اسکریپت پیامهای JSON را با قرائتهای حسگر شبیهسازی شده ایجاد میکند که دارای شناسه دستگاه، مهر زمانی، دادههای حسگر (دما، رطوبت، فشار، نور)، اطلاعات مکان (طول جغرافیایی، طول جغرافیایی)، وضعیت دستگاه (باتری، سیگنال، نوع اتصال) و برخی فرادادهها هستند.
- این یک جریان پیوسته از پیامها را از تعداد مجموعهای از دستگاههای منحصربهفرد تولید میکند، که هر کدام دادهها را در یک بازه زمانی مشخص ارسال میکنند و دستگاههای IoT دنیای واقعی را تقلید میکنند. در اینجا، ما دادههای 10 دستگاه را منتشر میکنیم که هر کدام 20 قرائت را در فاصله زمانی 10 ثانیه انجام میدهند.
- همچنین تمام داده های تولید شده را با استفاده از ابزار خط فرمان تولیدکننده کافکا در موضوع کافکا منتشر می کند.
- برخی از وابستگی های مورد استفاده توسط اسکریپت را نصب کنید - بسته
bc
برای محاسبات ریاضی و بستهjq
برای پردازش JSON.
sudo apt-get install bc jq
- اسکریپت را به صورت اجرایی تغییر دهید و اسکریپت را اجرا کنید. باید حدود 2 دقیقه طول بکشد تا اجرا شود.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
با اجرای این دستور که همه رویدادها را چاپ می کند، می توانید بررسی کنید که رویدادها با موفقیت منتشر شده اند. برای خروج، <control-c>
فشار دهید.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. مجموعه Dataproc را راه اندازی کنید
این بخش یک خوشه Dataproc در زیرشبکه VPC ایجاد می کند که در آن خوشه Managed Kafka وجود دارد. این خوشه برای اجرای کارهایی که آمار و بینشهای بیدرنگ مورد نیاز DemoIOT Solutions را تولید میکنند، استفاده میشود.
- یک خوشه Dataproc ایجاد کنید. در اینجا نام خوشه
dataproc-iot
است.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
شما باید پاسخی شبیه به زیر دریافت کنید:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
ایجاد خوشه ممکن است 10-15 دقیقه طول بکشد. منتظر تکمیل موفقیت آمیز این عملیات باشید و با توصیف خوشه بررسی کنید که خوشه در حالت RUNNING
است.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. پیام های کافکا را با استفاده از Dataproc پردازش کنید
در این بخش آخر، یک کار Dataproc ارسال می کنید که پیام های منتشر شده را با استفاده از Spark Streaming پردازش می کند. این کار در واقع برخی از آمارها و بینش های بلادرنگ را ایجاد می کند که می تواند توسط DemoIOT Solutions استفاده شود.
- این دستور را اجرا کنید تا فایل کار جاری PySpark به نام
process_iot.py
به صورت محلی ایجاد شود.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
توضیح
- این کد یک کار جریان ساخت یافته PySpark را برای خواندن داده ها از یک موضوع مشخص شده کافکا تنظیم می کند. از آدرس راهانداز سرور کافکا و تنظیمات کافکا بارگیری شده از یک فایل پیکربندی GCS برای اتصال و احراز هویت با کارگزار کافکا استفاده میکند.
- ابتدا دادههای خام کافکا را بهعنوان جریانی از آرایههای بایتی میخواند و آن آرایههای بایت را به رشتهها میفرستد و
json_schema
با استفاده از StructType Spark برای تعیین ساختار دادهها (شناسه دستگاه، مهر زمانی، مکان، دادههای حسگر و غیره) اعمال میکند. - 10 ردیف اول را برای پیش نمایش در کنسول چاپ می کند، میانگین دمای هر سنسور را محاسبه می کند و تمام داده ها را در سطل GCS با فرمت
avro
می نویسد. Avro یک سیستم سریالسازی داده مبتنی بر ردیف است که دادههای ساختیافته را بهطور کارآمد در یک قالب باینری فشرده و تعریفشده از طریق طرحواره ذخیره میکند و تکامل طرحواره، بیطرفی زبان و فشردهسازی بالا را برای پردازش دادههای در مقیاس بزرگ ارائه میدهد.
- فایل
client.properties
را ایجاد کنید و متغیر محیطی را برای آدرس بوت استرپ سرور کافکا پر کنید.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- فایل های
process_iot.py
وclient.properties
را در سطل Google Cloud Storage خود آپلود کنید تا بتوان از آنها برای کار Dataproc استفاده کرد.
gsutil cp process_iot.py client.properties $BUCKET
- برخی از شیشه های وابستگی را برای کار Dataproc در سطل GCS خود کپی کنید. این دایرکتوری حاوی jarهایی است که برای اجرای کارهای Spark Streaming با کافکا، و کتابخانه احراز هویت مدیریت شده کافکا و وابستگیهای آن، از Set up a client گرفته شده است.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- کار Spark را به خوشه Dataproc ارسال کنید.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
گزارش های درایور Spark چاپ خواهند شد. همچنین باید بتوانید این جداول ثبت شده در کنسول و داده های ذخیره شده در سطل GCS خود را مشاهده کنید.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. پاک کن
مراحل پاکسازی منابع را پس از تکمیل کد لبه دنبال کنید.
- خوشه Managed Kafka، Publisher GCE VM و Dataproc را حذف کنید.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- زیرشبکه و شبکه VPC خود را حذف کنید.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- اگر دیگر نمیخواهید از دادهها استفاده کنید، سطل GCS خود را حذف کنید.
gcloud storage rm --recursive $BUCKET
9. تبریک میگم
تبریک میگوییم، شما با مدیریت کافکا و دیتاپروک یک خط لوله پردازش دادههای اینترنت اشیا را با موفقیت ایجاد کردید که به راهحلهای DemoIOT کمک میکند تا بینشهای بیدرنگ درباره دادههای منتشر شده توسط دستگاههای خود کسب کنند!
شما یک خوشه مدیریت شده کافکا ایجاد کردید، رویدادهای IoT را در آن منتشر کردید و یک کار Dataproc را اجرا کردید که از جریان Spark برای پردازش این رویدادها در زمان واقعی استفاده می کرد. اکنون مراحل کلیدی مورد نیاز برای ایجاد خطوط لوله داده با استفاده از Managed Kafka و Dataproc را می دانید.