עיבוד נתוני IoT בזמן אמת באמצעות Dataproc ו-Google Managed Service for Apache Kafka

1. מבוא

2efacab8643a653b.png

עדכון אחרון: 2024-06-10

רקע

מכשירי האינטרנט של הדברים (IoT), החל מפתרונות לבית חכם ועד לחיישנים תעשייתיים, יוצרים כמויות עצומות של נתונים בקצה הרשת. הנתונים האלה חשובים מאוד למגוון תרחישי שימוש, כמו מעקב אחרי מכשירים, אבחון, מעקב, התאמה אישית, אופטימיזציה של צי המכשירים ועוד. השירות המנוהל של Google ל-Apache Kafka מציע דרך ניתנת להרחבה ועמידה להטמעה ולאחסון של זרם הנתונים הרציף הזה בצורה תואמת ל-OSS, קלה לשימוש ומאובטחת. בנוסף, Google Cloud Dataproc מאפשר לעבד את מערכי הנתונים הגדולים האלה לצורך ניתוח נתונים באמצעות אשכולות של Apache Spark ו-Hadoop.

מה תפַתחו

ב-Codelab הזה תלמדו איך ליצור פייפליין לעיבוד נתוני IoT באמצעות שירות מנוהל של Google ל-Apache Kafka, ‏ Dataproc, ‏ Python ו-Apache Spark, שמבצע ניתוח נתונים בזמן אמת. צינור עיבוד הנתונים:

  • פרסום נתונים ממכשירי IoT באשכול Kafka מנוהל באמצעות מכונות וירטואליות של GCE
  • הזרמת נתונים מהאשכול המנוהל של Kafka לאשכול של Dataproc
  • עיבוד נתונים באמצעות משימת Dataproc Spark Streaming

מה תלמדו

  • איך יוצרים אשכולות של Google Managed Kafka ו-Dataproc
  • איך מריצים משימות סטרימינג באמצעות Dataproc

מה תצטרכו

2. סקירה כללית

ב-Codelab הזה, נשתמש בסיפור של חברת דמה, DemoIOT Solutions. חברת DemoIOT Solutions מספקת מכשירי חיישנים שמודדים ומשדרים נתונים של טמפרטורה, לחות, לחץ, רמת אור ומיקום. הם רוצים להגדיר צינורות לעיבוד הנתונים האלה כדי להציג ללקוחות שלהם נתונים סטטיסטיים בזמן אמת. באמצעות צינורות כאלה, הם יכולים לספק ללקוחות מגוון רחב של שירותים, כמו ניטור, הצעות אוטומטיות, התראות ותובנות לגבי מקומות שבהם הלקוחות התקינו את החיישנים שלהם.

לשם כך, נשתמש ב-VM של GCE כדי לדמות את מכשיר ה-IoT. המכשיר יפרסם נתונים בנושא Kafka באשכול Kafka המנוהל של Google, שייקראו ויעובדו על ידי עבודת סטרימינג של Dataproc. בדף הזה ובדפים הבאים מוסבר איך לבצע את כל השלבים האלה.

הגדרות נדרשות

  1. מאתרים את שם הפרויקט ואת מספר הפרויקט. לעיון, אפשר לקרוא את המאמר בנושא איך מוצאים את השם, המספר והמזהה של הפרויקט.
  2. רשת משנה של VPC. ההגדרה הזו תאפשר קישוריות בין מכונת GCE VM, אשכול Kafka ואשכול Dataproc. כדי להציג רשימה של רשתות משנה קיימות באמצעות ה-CLI של gcloud, פועלים לפי ההוראות כאן. אם צריך, פועלים לפי ההוראות במאמר יצירת רשת VPC במצב אוטומטי כדי ליצור רשת VPC עם רשת משנה בכל אזור ב-Google Cloud. עם זאת, לצורך ה-codelab הזה, נשתמש בתת-רשת מאזור יחיד בלבד.
  • ברשת המשנה הזו, מוודאים שיש כלל חומת אש שמאפשר את כל התעבורה הנכנסת מ-TCP:22, שנדרש ל-SSH. הכלל הזה יהיה זמין לבחירה בקטע Firewall rules (כללי חומת אש) כשיוצרים רשת, לכן חשוב לבחור אותו.
  1. קטגוריית GCS. תצטרכו גישה לקטגוריית אחסון ב-Google Cloud כדי לאחסן משאבים של משימות Dataproc ולשמור נתונים מעובדים. אם אין לכם חשבון, אתם יכולים ליצור חשבון בפרויקט GCP.

איכלוס משתני סביבה

בטרמינל שבו מריצים את ה-CLI של gcloud, מאכלסים את משתני הסביבה האלה כדי שאפשר יהיה להשתמש בהם בהמשך.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

מחליפים את מה שכתוב בשדות הבאים:

  • <project-id> בשם פרויקט GCP שהגדרתם.
  • <project-number> בשם של מספר הפרויקט משלב 1 של הדרישות המוקדמות.
  • מחליפים את הערך <region> בשם של אזור מתוך אזורים ותחומים זמינים שרוצים להשתמש בהם. לדוגמה, אנחנו יכולים להשתמש ב-us-central1.
  • מחליפים את <zone> בשם של התחום מתוך אזורים ותחומים זמינים באזור שבחרתם קודם. לדוגמה, אם בחרתם באזור us-central1, תוכלו להשתמש באזור us-central1-f. האזור הזה ישמש ליצירת מכונה וירטואלית של GCE שמדמה מכשירי IoT. מוודאים שאזור הזמן נמצא באזור שבחרתם.
  • <subnet-path> עם הנתיב המלא של רשת המשנה משלב 2 של הדרישות המוקדמות. הערך של המאפיין הזה צריך להיות בפורמט: projects/<project-id>/regions/<region>/subnetworks/<subnet-name>.
  • <bucket-name> בשם של קטגוריית GCS משלב 3 של הדרישות המוקדמות.

3. הגדרת Managed Kafka של Google

בקטע הזה מוגדר אשכול Kafka מנוהל של Google, שמבצע פריסה של שרת Kafka ויוצר נושא בשרת הזה שאפשר לפרסם בו את נתוני ה-IoT ולקרוא אותם אחרי שנרשמים אליו. חברת DemoIOT Solutions יכולה להגדיר את האשכול הזה כך שכל המכשירים שלה יפרסמו בו נתונים.

יצירת אשכול Managed Kafka

  • יוצרים את אשכול Managed Kafka. בדוגמה הזו, שם האשכול הוא kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

אמורה להתקבל תגובה שדומה לזו:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

יצירת האשכול אורכת 20-30 דקות. ממתינים לסיום הפעולה.

יצירת נושא

  • יוצרים נושא מנוהל של Kafka באשכול. במקרה הזה, שם הנושא הוא kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

הפלט שמתקבל אמור להיראות כך:

Created topic [kafka-iot-topic].

4. הגדרת בעל תוכן דיגיטלי

כדי לפרסם באשכול Kafka המנוהל, הגדרנו מכונה וירטואלית ב-Google Compute Engine שיכולה לגשת ל-VPC שמכיל את רשת המשנה שבה נעשה שימוש באשכול Kafka המנוהל. המכונה הווירטואלית הזו מדמה את מכשירי החיישנים שמסופקים על ידי DemoIOT Solutions.

שלבים

  1. יוצרים מכונה וירטואלית ב-Google Compute Engine. בדוגמה הזו, שם מכונת ה-VM ב-GCE הוא publisher-instance.
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. נותנים לחשבון השירות שמוגדר כברירת מחדל ב-Google Compute Engine את ההרשאות להשתמש בשירות המנוהל ל-Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. משתמשים ב-SSH כדי להתחבר למכונה הווירטואלית. אפשר גם להשתמש במסוף Google Cloud כדי להתחבר באמצעות SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. כדי להפעיל את כלי שורת הפקודה של Kafka, צריך להתקין את Java ולהוריד את הקובץ הבינארי של Kafka באמצעות הפקודות האלה.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. מורידים את ספריית האימות של Managed Kafka ואת התלויות שלה, ומגדירים את מאפייני הלקוח של Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

פרטים נוספים על הגדרת המכונה של בעל התוכן הדיגיטלי זמינים במאמר בנושא הגדרת מכונת לקוח.

5. פרסום ב-Managed Kafka

אחרי שמגדירים את אפליקציה לשליחת הודעות, אפשר להשתמש בשורת הפקודה של Kafka כדי לפרסם נתוני דמה ממכונה וירטואלית ב-GCE (סימולציה של מכשירי IoT על ידי DemoIOT Solutions) באשכול Kafka מנוהל.

  1. מכיוון שהתחברנו באמצעות SSH למופע של מכונה וירטואלית ב-GCE, אנחנו צריכים לאכלס מחדש את המשתנה PROJECT_ID:
export PROJECT_ID=<project-id>
export REGION=<region>

מחליפים את מה שכתוב בשדות הבאים:

  • <project-id> בשם פרויקט GCP שהגדרתם.
  • <region> עם האזור שבו נוצר אשכול Kafka
  1. משתמשים בפקודה managed-kafka clusters describe כדי לקבל את כתובת ה-IP של שרת ה-bootstrap של Kafka. אפשר להשתמש בכתובת הזו כדי להתחבר לאשכול Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. מציגים ברשימה את הנושאים באשכול:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

אמור להופיע הפלט הבא, שכולל את הנושא kafka-iot-topic שיצרנו קודם.

__remote_log_metadata
kafka-iot-topic
  1. מעתיקים את הסקריפט הזה לקובץ חדש publish_iot_data.sh. כדי ליצור קובץ חדש במכונה הווירטואלית של GCE, אפשר להשתמש בכלי כמו vim או nano.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

הסבר

  • הסקריפט הזה יוצר הודעות JSON עם קריאות חיישנים מדומה שכוללות מזהה מכשיר, חותמת זמן, נתוני חיישנים (טמפרטורה, לחות, לחץ, אור), נתוני מיקום (קו רוחב, קו אורך), מצב המכשיר (סוללה, אות, סוג חיבור) ומטא-נתונים.
  • הוא יוצר זרם רציף של הודעות ממספר מוגדר של מכשירים ייחודיים, שכל אחד מהם שולח נתונים במרווח זמן מוגדר, ומדמה מכשירי IoT בעולם האמיתי. במאמר הזה אנחנו מפרסמים נתונים מ-10 מכשירים שמפיקים 20 קריאות כל אחד, במרווח זמן של 10 שניות.
  • הוא גם מפרסם את כל הנתונים שנוצרו בנושא Kafka באמצעות כלי שורת הפקודה של Kafka producer.
  1. מתקינים כמה תלויות שמשמשות את הסקריפט – חבילת bc לחישובים מתמטיים וחבילת bc לעיבוד JSON.jq
sudo apt-get install bc jq
  1. משנים את הסקריפט כך שניתן יהיה להפעיל אותו, ומריצים אותו. הפעולה תימשך כ-2 דקות.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

כדי לוודא שהאירועים פורסמו בהצלחה, מריצים את הפקודה הבאה שמדפיסה את כל האירועים. לוחצים על <control-c> כדי לצאת.

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

6. הגדרת אשכול Dataproc

בקטע הזה יוצרים אשכול Dataproc בתת-הרשת של ה-VPC שבה נמצא אשכול Kafka המנוהל. האשכול הזה ישמש להרצת משימות שיוצרות את הנתונים הסטטיסטיים והתובנות בזמן אמת שנדרשים ל-DemoIOT Solutions.

  1. יוצרים אשכול Dataproc. בדוגמה הזו, שם האשכול הוא dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

אמורה להתקבל תגובה שדומה לזו:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

יצירת האשכול עשויה להימשך 10-15 דקות. ממתינים לסיום הפעולה בהצלחה ובודקים שהאשכול נמצא במצב RUNNING על ידי תיאור האשכול.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. עיבוד הודעות Kafka באמצעות Dataproc

בקטע האחרון הזה, תשלחו משימת Dataproc לעיבוד ההודעות שפורסמו באמצעות Spark Streaming. העבודה הזו יוצרת נתונים סטטיסטיים ותובנות בזמן אמת שאפשר להשתמש בהם ב-DemoIOT Solutions.

  1. מריצים את הפקודה הזו כדי ליצור באופן מקומי את קובץ העבודה של PySpark לסטרימינג שנקרא process_iot.py.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

הסבר

  • הקוד הזה מגדיר עבודת סטרימינג מובנה של PySpark לקריאת נתונים מנושא Kafka שצוין. הוא משתמש בכתובת ה-bootstrap של שרת Kafka ובאמצעות הגדרות Kafka שנטענו מקובץ תצורה של GCS כדי להתחבר ל-Kafka broker ולבצע אימות.
  • הוא קורא קודם את הנתונים הגולמיים מ-Kafka כזרם של מערכי בייטים, ממיר את מערכי הבייטים האלה למחרוזות ומחיל את json_schema באמצעות StructType של Spark כדי לציין את מבנה הנתונים (מזהה מכשיר, חותמת זמן, מיקום, נתוני חיישנים וכו').
  • הוא מדפיס את 10 השורות הראשונות במסוף לתצוגה מקדימה, מחשב את הטמפרטורה הממוצעת לכל חיישן וכותב את כל הנתונים לדלי GCS בפורמט avro. ‫Avro היא מערכת סריאליזציה של נתונים שמבוססת על שורות. היא מאחסנת נתונים מובְנים בצורה יעילה בפורמט בינארי קומפקטי שמוגדר על ידי סכימה, ומציעה התפתחות של סכימה, ניטרליות של שפה ודחיסה גבוהה לעיבוד נתונים בקנה מידה גדול.
  1. יוצרים את הקובץ client.properties ומאכלסים את משתנה הסביבה לכתובת ה-bootstrap של שרת Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. מעלים קובצי process_iot.py ו-client.properties לקטגוריה של Google Cloud Storage, כדי שאפשר יהיה להשתמש בהם במשימת Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
  1. מעתיקים כמה קובצי JAR של תלות למשימת Dataproc לדלי GCS. הספרייה הזו מכילה קובצי JAR שנדרשים להפעלת משימות Spark Streaming עם Kafka, וספריית האימות המנוהלת של Kafka והתלות שלה, שנלקחו מתוך הגדרת מכונת לקוח.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. שולחים את משימת Spark לאשכול Dataproc.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

יומני הרישום של מנהל ההתקן של Spark יודפסו. בנוסף, תוכלו לראות את הטבלאות האלה ביומן במסוף ואת הנתונים שמאוחסנים בדלי GCS.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. הסרת המשאבים

פועלים לפי השלבים כדי לנקות את המשאבים אחרי שמסיימים את ה-Codelab.

  1. מוחקים את אשכול Kafka המנוהל, את המכונה הווירטואלית של Publisher GCE ואת אשכול Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. מוחקים את רשת המשנה ואת רשת ה-VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. אם אתם לא רוצים יותר להשתמש בנתונים, אתם יכולים למחוק את דלי ה-GCS.
gcloud storage rm --recursive $BUCKET

9. מזל טוב

יצרתם בהצלחה פייפליין של עיבוד נתונים של IoT באמצעות Manage Kafka ו-Dataproc, שעוזר ל-DemoIOT Solutions לקבל תובנות בזמן אמת לגבי הנתונים שפורסמו על ידי המכשירים שלהם.

יצרתם אשכול מנוהל של Kafka, פרסמתם בו אירועי IoT והפעלתם משימת Dataproc שהשתמשה בסטרימינג של Spark כדי לעבד את האירועים האלה בזמן אמת. עכשיו אתם יודעים מהם השלבים העיקריים שנדרשים ליצירת צינורות עיבוד נתונים באמצעות Managed Kafka ו-Dataproc.

מאמרי עזרה