עיבוד נתוני IoT בזמן אמת באמצעות Dataproc ו-Google Managed Service for Apache Kafka
מידע על Codelab זה
1. מבוא
תאריך העדכון האחרון: 10 ביוני 2024
רקע
מכשירי האינטרנט של הדברים (IoT), החל מפתרונות לבית חכם ועד חיישנים תעשייתיים, יוצרים כמויות אדירות של נתונים בקצה הרשת. הנתונים האלה הם נכס חשוב במגוון תרחישים לדוגמה, כמו מעקב אחרי מכשירים, מעקב, אבחון, מעקב, התאמה אישית, אופטימיזציה של צי כלי רכב ועוד. Google Managed Service for Apache Kafka הוא שירות מנוהל שמאפשר להטמיע ולשמור את מקור הנתונים המתמשך הזה בצורה גמישה ועמידה, באופן מאובטח וקל לשימוש, ותואמת ל-OSS. בנוסף, Google Cloud Dataproc מאפשר עיבוד של מערכי הנתונים הגדולים האלה לצורך ניתוח נתונים באמצעות אשכולות Apache Spark ו-Hadoop.
מה תפַתחו
בקודלאב הזה תלמדו ליצור צינור עיבוד נתונים של IoT באמצעות השירות המנוהל של Google ל-Apache Kafka, Dataproc, Python ו-Apache Spark, שמבצע ניתוח נתונים בזמן אמת. צינור עיבוד הנתונים יבצע את הפעולות הבאות:
- פרסום נתונים ממכשירי IoT לאשכול Kafka מנוהל באמצעות מכונות וירטואליות של GCE
- העברת נתונים בסטרימינג מאשכול Manage Kafka לאשכול Dataproc
- עיבוד נתונים באמצעות משימה של Dataproc Spark Streaming
מה תלמדו
- איך יוצרים אשכולות של Kafka ו-Dataproc בניהול Google
- איך מריצים משימות סטרימינג באמצעות Dataproc
מה צריך להכין
- חשבון GCP פעיל עם פרויקט מוגדר. אם אין לכם חשבון, תוכלו להירשם לתקופת ניסיון בחינם.
- ה-CLI של gcloud מותקן ומוגדר. אפשר לפעול לפי ההוראות להתקנת ה-CLI של gcloud במערכת ההפעלה.
- ממשקי API מופעלים ל-Google Managed Kafka ול-Dataproc בפרויקט GCP.
2. סקירה כללית
בקודלאב הזה נלווה לסיפור של חברה מדומה, DemoIOT Solutions. DemoIOT Solutions מספקת מכשירי חיישנים שמודדים ומעבירים נתונים לגבי טמפרטורה, לחות, לחץ, רמת תאורה ומיקום. הם רוצים להגדיר צינורות עיבוד נתונים שיעבדו את הנתונים האלה כדי להציג ללקוחות נתונים סטטיסטיים בזמן אמת. באמצעות צינורות עיבוד נתונים כאלה, הם יכולים לספק ללקוחות מגוון רחב של שירותים, כמו מעקב, הצעות אוטומטיות, התראות ותובנות לגבי המקומות שבהם הלקוחות התקינו את החיישנים שלהם.
לשם כך, נשתמש ב-VM של GCE כדי לדמות את מכשיר ה-IoT. המכשיר יפרסם נתונים בנושא Kafka באשכולות Kafka המנוהלים על ידי Google, והם יקראו ויעובדו על ידי משימה של סטרימינג ב-Dataproc. ההגדרה של התנאים המוקדמים והדפים הבאים יעזרו לכם לבצע את כל השלבים האלה.
הגדרת דרישות מוקדמות
- מחפשים את שם הפרויקט ומספר הפרויקט. למידע נוסף, אפשר לעיין במאמר איך מוצאים את השם, המספר והמזהה של הפרויקט.
- רשת משנה של VPC. כך תוכלו לקשר בין המכונה הווירטואלית ב-GCE, אשכול Kafka ואשכול Dataproc. כאן מוסבר איך להציג רשימה של תת-רשתות קיימות באמצעות ה-CLI של gcloud. אם צריך, פועלים לפי ההוראות ליצירת רשת VPC במצב אוטומטי, כדי ליצור רשת VPC עם רשת משנה בכל אזור ב-Google Cloud. עם זאת, לצורך הקודלאב הזה נשתמש ברשת משנה מאזור אחד בלבד.
- ברשת המשנה הזו, חשוב לוודא שיש כלל של חומת אש שמאפשר את כל תעבורת הנתונים הנכנסת (ingress) מ-tcp:22, שהוא ה-SSH הנדרש. הכלל הזה יהיה זמין לבחירה בקטע 'כללי חומת אש' כשיוצרים רשת, לכן חשוב לבחור אותו.
- קטגוריה ב-GCS. תצטרכו גישה לקטגוריית אחסון ב-Google Cloud כדי לאחסן משאבי משימות של Dataproc ולשמור נתונים שעברו עיבוד. אם אין לכם חשבון, אתם יכולים ליצור אותו בפרויקט GCP.
איך מאכלסים את משתני הסביבה
בתחנה שבה אתם מריצים את ה-CLI של gcloud, מאכלסים את משתני הסביבה האלה כדי שתוכלו להשתמש בהם בהמשך.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
מחליפים את מה שכתוב בשדות הבאים:
<project-id>
בשם הפרויקט ב-GCP שהגדרתם.<project-number>
בשם של מספר הפרויקט שמופיע בשלב 1 של התנאי המקדים.<region>
בשם של אזור מתוך האזורים והתחומים הזמינים שבהם רוצים להשתמש. לדוגמה, אפשר להשתמש ב-us-central1
.<zone>
בשם של האזור מתוך אזורים ותחומים זמינים בקטע של האזור שבחרתם קודם. לדוגמה, אם בחרתם אתus-central1
כאזור, תוכלו להשתמש ב-us-central1-f
כתחום. האזור הזה ישמש ליצירת המכונה הווירטואלית ב-GCE שמסימולטת מכשירי IoT. מוודאים שהתחום נמצא באזור שבחרתם.<subnet-path>
עם הנתיב המלא של תת-הרשת משלב 2 של התנאי המקדים. הערך של השדה הזה חייב להיות בפורמט:projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
.<bucket-name>
בשם הקטגוריה ב-GCS שמופיע בשלב 3 של התנאי המקדים.
3. הגדרת Google Managed Kafka
בקטע הזה מגדירים אשכול Kafka מנוהל של Google, שמפרסם את שרת Kafka ויוצר נושא בשרת הזה שבו אפשר לפרסם את נתוני ה-IoT ולקרוא אותם אחרי שמירת מינויים לנושא. צוות DemoIOT Solutions יכול להגדיר את האשכולות כך שכל המכשירים שלהם יפרסמו נתונים אליהם.
יצירת אשכול Kafka מנוהל
- יוצרים את האשכול של Managed Kafka. כאן השם של האשכול הוא
kafka-iot
.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
אמורה להופיע תגובה שדומה לזו:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
יצירת האשכולות נמשכת 20 עד 30 דקות. ממתינים לסיום הפעולה.
יצירת נושא
- יוצרים נושא Kafka מנוהל באשכול. כאן, שם הנושא הוא
kafka-iot-topic
.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
הפלט אמור להיראות כך:
Created topic [kafka-iot-topic].
4. הגדרת בעל תוכן דיגיטלי
כדי לפרסם באשכול Kafka המנוהל, אנחנו מגדירים מכונה וירטואלית ב-Google Compute Engine שיכולה לגשת ל-VPC שמכיל את תת-הרשת שבה נעשה שימוש באשכול Kafka המנוהל. המכונה הווירטואלית הזו מדמה את מכשירי החיישנים שסופקו על ידי DemoIOT Solutions.
שלבים
- יוצרים את המכונה הווירטואלית ב-Google Compute Engine. כאן, השם של המכונה הווירטואלית ב-GCE הוא
publisher-instance
.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- נותנים לחשבון השירות שמוגדר כברירת מחדל ב-Google Compute Engine את ההרשאות לשימוש בשירות המנוהל ל-Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- משתמשים ב-SSH כדי להתחבר ל-VM. לחלופין, אפשר להשתמש במסוף Google Cloud ל-SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- כדי להריץ את הכלים של שורת הפקודה של Kafka, צריך להתקין את Java ולהוריד את הקובץ הבינארי של Kafka באמצעות הפקודות הבאות.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- מורידים את ספריית האימות של Managed Kafka ואת יחסי התלות שלה, ומגדירים את מאפייני הלקוח של Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
פרטים נוספים על הגדרת המכונה של בעל התוכן הדיגיטלי זמינים במאמר הגדרת מכונת לקוח.
5. פרסום ב-Managed Kafka
עכשיו, אחרי שהגדרתם את הכלי לפרסום, תוכלו להשתמש בשורת הפקודה של Kafka כדי לפרסם נתונים מדומים מהמכונה הווירטואלית של GCE (הדמיה של מכשירי IoT על ידי DemoIOT Solutions) לאשכולות Kafka המנוהלים.
- מאחר שהיינו מחוברים למכונה הווירטואלית ב-GCE באמצעות SSH, אנחנו צריכים לאכלס מחדש את המשתנה
PROJECT_ID
:
export PROJECT_ID=<project-id>
export REGION=<region>
מחליפים את מה שכתוב בשדות הבאים:
<project-id>
בשם הפרויקט ב-GCP שהגדרתם.- מחליפים את
<region>
באזור שבו נוצר אשכול Kafka
- משתמשים בפקודה
managed-kafka clusters describe
כדי לקבל את כתובת ה-IP של שרת ה-bootstrap של Kafka. אפשר להשתמש בכתובת הזו כדי להתחבר לאשכולות Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- מציגים את הנושאים באשכול:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
הפלט הבא אמור להופיע, והוא מכיל את הנושא kafka-iot-topic
שיצרנו מקודם.
__remote_log_metadata
kafka-iot-topic
- מעתיקים ומדביקים את הסקריפט הזה בקובץ חדש בשם
publish_iot_data.sh
. כדי ליצור קובץ חדש ב-VM של GCE, אפשר להשתמש בכלי כמוvim
אוnano
.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
הסבר
- הסקריפט הזה יוצר הודעות JSON עם קריאות חיישנים מדומינות שכוללות את מזהה המכשיר, חותמת זמן, נתוני חיישנים (טמפרטורה, לחות, לחץ, אור), פרטי מיקום (קו הרוחב, קו האורך), סטטוס המכשיר (סוללה, אות, סוג חיבור) ומטא-נתונים מסוימים.
- הוא יוצר זרימה רציפה של הודעות ממספר מוגדר של מכשירים ייחודיים, שכל אחד מהם שולח נתונים במרווח זמן מסוים, כדי לחקות מכשירים של IoT בעולם האמיתי. כאן אנחנו מפרסמים נתונים מ-10 מכשירים שמפיקים 20 קריאות כל אחד, במרווח זמן של 10 שניות.
- הוא גם מפרסם את כל הנתונים שנוצרו בנושא Kafka באמצעות כלי שורת הפקודה של Kafka ליצירת אירועים.
- מתקינים כמה יחסי תלות שבהם משתמש הסקריפט – החבילה
bc
לחישוב מתמטיים והחבילהjq
לעיבוד JSON.
sudo apt-get install bc jq
- משנים את הסקריפט כך שיהיה קובץ הפעלה ומריצים אותו. הפעלת הקוד אמורה להימשך כ-2 דקות.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
כדי לבדוק אם האירועים פורסמו, מריצים את הפקודה הבאה שמדפיסה את כל האירועים. כדי לצאת, מקישים על <control-c>
.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. הגדרת אשכול Dataproc
בקטע הזה יוצרים אשכול Dataproc בתת-הרשת של ה-VPC שבה נמצא אשכול Kafka המנוהל. האשכולות האלה ישמשו להרצת משימות שיוצרות את הנתונים הסטטיסטיים והתובנות בזמן אמת שנדרשים ל-DemoIOT Solutions.
- יוצרים אשכול Dataproc. כאן השם של האשכול הוא
dataproc-iot
.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
אמורה להופיע תגובה שדומה לזו:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
יצירת האשכולות עשויה להימשך 10 עד 15 דקות. ממתינים לסיום הפעולה ומתארים את האשכול כדי לוודא שהוא בסטטוס RUNNING
.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. עיבוד הודעות Kafka באמצעות Dataproc
בקטע האחרון הזה, תשלחו משימה ב-Dataproc שתعالج את ההודעות שפורסמו באמצעות Spark Streaming. המשימה הזו יוצרת כמה נתונים סטטיסטיים ותובנות בזמן אמת שאפשר להשתמש בהם ב-DemoIOT Solutions.
- מריצים את הפקודה הזו כדי ליצור את קובץ המשימה של PySpark בסטרימינג שנקרא
process_iot.py
באופן מקומי.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
הסבר
- הקוד הזה מגדיר משימה של PySpark Structured Streaming לקריאת נתונים מנושא Kafka מסוים. הוא משתמש בכתובת ה-bootstrap של שרת Kafka שסופקה ובהגדרות Kafka שנטענו מקובץ תצורה ב-GCS כדי להתחבר ל-broker של Kafka ולבצע אימות.
- קודם המערכת קוראת את הנתונים הגולמיים מ-Kafka כזרם של מערכי בייטים, מעבירה את מערכי הבייטים האלה למחרוזות ומחילה את
json_schema
באמצעות StructType של Spark כדי לציין את מבנה הנתונים (מזהה מכשיר, חותמת זמן, מיקום, נתוני חיישנים וכו'). - הפונקציה מדפיסה את 10 השורות הראשונות במסוף לצורך תצוגה מקדימה, מחשבת את הטמפרטורה הממוצעת לכל חיישן וכותבת את כל הנתונים לקטגוריה של GCS בפורמט
avro
. Avro היא מערכת לסריאליזציה של נתונים שמבוססת על שורות, שמאחסנת ביעילות נתונים מובְנים בפורמט בינארי קומפקטי שמוגדר לפי סכימה. המערכת מספקת התפתחות של סכימה, נייטרליות לשונית ודחיסת נתונים גבוהה לעיבוד נתונים בקנה מידה נרחב.
- יוצרים את הקובץ
client.properties
ומאכלסים את משתנה הסביבה של כתובת ה-bootstrap של שרת Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- מעלים את הקבצים
process_iot.py
ו-client.properties
לקטגוריה של Google Cloud Storage, כדי שאפשר יהיה להשתמש בהם במשימה של Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
- מעתיקים לקטגוריה ב-GCS כמה קובצי jar של יחסי תלות של המשימה ב-Dataproc. הספרייה הזו מכילה קובצי jar שנדרשים להרצת משימות Spark Streaming עם Kafka, וגם את ספריית האימות המנוהלת של Kafka ואת יחסי התלות שלה, שנלקחו מהמאמר הגדרת מחשב לקוח.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- שולחים את משימה Spark לאשכול Dataproc.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
יומני הנהג של Spark יודפסו. אמורים להופיע גם הטבלאות האלה ביומן של המסוף והנתונים ששמורים בקטגוריה שלכם ב-GCS.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. הסרת המשאבים
אחרי שמשלימים את הקודלאב, פועלים לפי השלבים כדי לנקות את המשאבים.
- מוחקים את אשכול Kafka המנוהל, את המכונה הווירטואלית של GCE לצורכי פרסום ואת אשכול Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- מוחקים את הרשת ואת רשת המשנה של ה-VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- אם אתם לא רוצים להשתמש יותר בנתונים, מוחקים את הקטגוריה ב-GCS.
gcloud storage rm --recursive $BUCKET
9. מזל טוב
מזל טוב, סיימתם ליצור צינור עיבוד נתונים של IoT באמצעות Manage Kafka ו-Dataproc, שיעזור ל-DemoIOT Solutions לקבל תובנות בזמן אמת על הנתונים שפורסמו על ידי המכשירים שלהם.
יצרתם אשכול Kafka מנוהל, פרסמתם בו אירועי IoT והפעלתם משימה ב-Dataproc שבה נעשה שימוש ב-Spark streaming כדי לעבד את האירועים האלה בזמן אמת. עכשיו אתם יודעים מהם השלבים העיקריים ליצירת צינורות עיבוד נתונים באמצעות Managed Kafka ו-Dataproc.